基于实时计算Flink使用CTAS&CDAS功能同步MySQL数据至StarRocks

CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步。本文为您介绍如何使用实时计算Flink平台和E-MapReduce StarRocks通过CTAS&CDAS功能实现实时数仓中TP(Transaction Processing)和AP(Analytical Processing)数据同步的场景。

背景信息

通过CTAS(CREATE TABLE AS)语句可以在StarRocks中自动创建和MySQL中表结构一致的表,并进行数据同步,还能实时同步上游表结构(Schema)的变更到下游表,提高您在目标存储中创建表和维护源表结构变更的效率。

当执行CTAS语句时,Flink会按照以下流程执行:

  1. 检查目标存储中是否存在该目标表。

    • 如果不存在,则通过目标端Catalog在目标存储中创建相应的目标表,该目标表具有和数据源相同的Schema。

    • 如果存在,则跳过建表。如果已存在的目标表与源表Schema不一致,则会报错提示。

  2. 提交和启动相应的数据同步作业。同步数据源的数据以及Schema的变更到目标表中。

表结构变更同步策略通过CTAS语句,在实时同步数据的同时,还能同步源表Schema的变更到目标表中。

Schema变更包括初始表的创建以及未来表的变更。

  • 当前支持同步的Schema变更:

    • 添加可空列:会自动在目标表Schema末尾添加对应的列,并自动同步新增列的数据。

    • 删除可空列:不会直接在目标表中删除该列,而是将该列的数据自动填充为NULL值。

    • 重命名列:被视为添加列和删除列。直接在目标表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。

      例如,如果col_a重命名为col_b,则会在目标表末尾添加col_b,并自动将col_a的数据填充为NULL值。

  • 暂不支持同步的Schema变更:

    • 数据类型的变更。

      例如,由VARCHAR变为BIGINT,由NOT NULL变为NULLABLE属性。

    • 主键或索引等约束的变更。

    • 非空列的增加或删除的变更。

    • DDL中字段长度的调整。

说明
  • 如果遇到不支持的Schema变更,则需要您手动删除下游目标表,重新启动CTAS作业,即重新创建目标表并重新同步历史数据。

  • CTAS不会识别具体的DDL类型,而是对比前后两条数据的Schema差异。因此,如果您先删除了某列后,又加回了该列,且这两个DDL之间无数据变化,则CTAS会认为没有发生结构变更。同理,如果您添加了一列,直到该表有数据变化,CTAS才会感知到结构变更,才会同步结构变更到目标表。

  • 通过CTAS建表支持的字段类型信息,请参见Flink与StarRocks的数据类型映射关系

  • 在使用CTAS语句合并MySQL多张表时,默认情况下,系统会自动在生成的新表结构最前面添加_db_name_table_name两列,用来追踪源数据表信息。由于这一自动添加行为不可更改,您在定义新表的列顺序时,请直接从第三列开始定义您期望的列顺序,以确保新表结构符合预期。

前提条件

说明

本文以5.7版本的MySQL、EMR-3.39.1版本的StarRocks集群和1.15-6.0.3版本的Flink为例介绍。

使用限制

  • 创建的Flink集群、StarRocks集群以及RDS MySQL实例需要在同一个VPC下。

  • RDS MySQL须为5.7及以上版本。

  • StarRocks须开启公网访问。

  • Flink集群中的Flink须为1.15-vvr-6.0.3及以上版本。

步骤一:准备测试数据

  1. 创建测试的数据库和账号,详情请参见创建数据库和账号

    创建完数据库和账号后,需要授权测试账号的读写权限。

    说明

    本文创建的数据库名称为test_cdc,账号为test。

  2. 使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL

  3. 在MySQL中执行以下命令,创建数据表。

    use test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. 使用SSH方式登录StarRocks集群,详情请参见登录集群

  5. 执行以下,连接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  6. 执行以下命令,创建用户和授权。

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED by '123456';
    GRANT CREATE TABLE ON DATABASE test_cdc TO test;

步骤二:在实时计算Flink控制台通过SQL客户端创建Catalog

在阿里云实时计算Flink控制台的作业开发页面中,创建MySQL和StarRocks的Catalog。详情请参见Flink SQL作业快速入门

说明

参数仅供参考格式,具体内容请根据实际情况配置。

  • MySQL Catalog

    • 代码示例

      CREATE CATALOG mysql WITH (
        'type' = 'mysql',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'default-database' = 'test_cdc'
      );
    • 参数配置

      参数

      说明

      type

      类型,固定值为mysql。

      hostname

      RDS的内网地址。您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com。

      port

      MySQL数据库服务的端口号,默认值为3306。

      username

      MySQL数据库服务的用户名。

      填写步骤一:准备测试数据中账号的用户名。本示例为test。

      password

      MySQL数据库服务的密码。

      填写步骤一:准备测试数据中账号的密码。

      default-database

      默认的MySQL数据库名称。

      填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

  • StarRocks Catalog

    • 代码示例

      CREATE CATALOG sr  WITH (
        'type' = 'starrocks',
        'endpoint' = '172.16.**.**:9030',
        'username' = 'test',
        'password' = '123456',
        'dbname' = 'test_cdc'
      );
    • 参数配置

      参数

      说明

      type

      类型,固定值为starrocks。

      endpoint

      StarRocks FE的IP地址和端口。

      username

      StarRocks的用户名。

      填写步骤一:准备测试数据中账号的用户名。本示例为test。

      password

      StarRocks数据库服务的密码。

      填写步骤一:准备测试数据中账号的密码。

      dbname

      StarRocks数据库名称。

      填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

步骤三:创建并上线作业

  1. 在阿里云实时计算Flink控制台的作业开发页面,编写CTAS语句。

    您可以使用以下三种示例发送CTAS语句。

    • AtLeast once语义:通过sink.buffer-flush.interval-ms配置项,配置每次写入StarRocks的时间间隔,优点是写入间隔时间短,占用内存较少。

      /*
            AtLeast once 语义
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl_sr with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '123456',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • Exactly once语义:需要定义checkpoint间隔,优点是在各种异常情况下保障数据不丢失不重复,缺点是数据可见时间取决于checkpoint间隔。更多信息,请参见Checkpointing

      /*
            Exactly once 语义。
      */
      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '123456',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • Simple模式:优点是创建表时不需要关注原表有哪些字段,会按照MySQL的表格式照搬过来,开发者使用比较方便。缺点是不能创建分区,对于需要分区的表,仍需要通过normal模式创建。

      /*
            上面两个为normal模式,本示例演示simple模式
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'='buckets 8',
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '123456',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      

    表 1. WITH参数

    参数

    是否必选

    描述

    starrocks.create.table.properties

    StarRocks建表语句中除了字段定义以外的其他后缀定义,例如示例中的engine、key和buckets等。

    database-name

    StarRocks数据库名称。

    本示例为test_cdc。

    jdbc-url

    用于在StarRocks中执行查询操作。

    例如,jdbc:mysql://172.16.**.**:9030。其中,172.16.**.**为StarRocks集群的内网IP地址。

    load-url

    指定FE的IP地址和HTTP端口,格式为StarRocks集群的内网IP地址:端口。本文以8030端口为例,实际请根据您的集群版本选择访问的端口:

    • 18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。

    • 8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。

    说明

    访问端口详情,请参见UI和端口

    sink.semantic

    填写exactly-once可以保障数据一致性语义,默认为at-least-once。

    starrocks.create.table.mode

    支持以下参数值:

    • normal模式(默认值):必须像示例一样在starrocks.create.table.properties配置中填写engine、key和buckets等完整的配置。

    • simple模式:默认选择engine为olap,选择key类型为primary key,且主键与MySQL的主键保持完全一致,默认distributed by hash(所有的主键),默认无分区。需要在starrocks.create.table.properties配置中填写的必填内容为buckets ,选填内容为properties等配置。

    说明
    • 因为vvr-6.0.5-flink-1.15及以上版本移除了sink.use.new-api,所以使用vvr-6.0.5-flink-1.15之前的版本时,请在with参数中添加'sink.use.new-api'='false',

    • 其他配置请参见从Apache Flink持续导入

    表 2. OPTIONS参数

    参数

    描述

    connector

    类型,固定值为mysql-cdc。

    hostname

    RDS的内网地址。

    您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    port

    MySQL数据库服务的端口号,默认值为3306。

    username

    MySQL数据库服务的用户名。

    填写步骤一:准备测试数据中账号的用户名。本示例为test。

    password

    MySQL数据库服务的密码。

    填写步骤一:准备测试数据中账号的密码。

    table-name

    StarRocks中的表名称。

    填写步骤一:准备测试数据中创建的表名。本示例为runoob_tbl。

    database-name

    默认的MySQL数据库名称。

    填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

  2. 在作业开发页面的高级配置中,选择vvr-6.0.3及以上的版本。

  3. 单击上线

  4. 在作业运维页面,单击目标作业操作列的启动

步骤四:场景演示

查询数据

  1. 使用SSH方式登录StarRocks集群,详情请参见登录集群

  2. 执行以下,连接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  3. 在StarRocks连接窗口执行以下命令,查看表数据。

    use test_cdc;
    select * from runoob_tbl1;

    返回信息如下,表示MySQL上的数据已同步至StarRocks。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

查询插入后的数据

  1. 在RDS数据库窗口执行以下命令,插入数据。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已成功插入。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步数据更新

  1. 在RDS数据库窗口执行以下命令,更新指定数据。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已同步更新。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步数据删除

  1. 在RDS数据库窗口执行以下命令,删除指定数据。

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已同步删除。

    +-----------+--------------+---------------+-----------------+---------+ 
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | 
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 | 
    +-----------+--------------+---------------+-----------------+---------+

增加可空列

  1. 在RDS数据库窗口执行以下命令,增加可空列。

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. 执行以下命令 ,插入数据。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示Schema已经成功变更。

    +-----------+--------------+---------------+-----------------+---------+----------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | 
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |        18 | new          | tom           | 2022-06-22      |       3 |     NULL |
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |         1 | second       | tom2          | 2022-06-23      |       1 |      2   |
    |        18 | first        | tom           | 2022-06-22      |       3 |     NULL |
    +-----------+--------------+---------------+-----------------+---------+----------+ 

CDAS介绍

CDAS是CTAS的一个语法糖。通过CDAS语句,可以实现MySQL中的整库同步,即生成一个Flink Job。Source是MySQL中的database,目标表是StarRocks中对应的多张表,同时可以使用including table语法,只选择一个database中的部分表进行CDAS操作。

与CTAS的执行相同,需要在创建MySQL和StarRocks相应的Catalog后,执行CDAS语句。创建语法示例如下。

CREATE DATABASE IF NOT EXISTS sr_db with (
  'starrocks.create.table.properties'=' buckets 8',
  'starrocks.create.table.mode'='simple',
  'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
  'load-url'='172.16.**.**:18030',
  'username'='test',
  'password' = '123456',
  'sink.buffer-flush.interval-ms' = '5000' ,
  'sink.properties.row_delimiter' = '\x02',
  'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table  'tabl1','tbl2','tbl3'
/*+ OPTIONS (   'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',   
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;