基于DataFlow集群的Flink服务使用CTAS语句同步MySQL数据至StarRocks

本文为您介绍如何使用EMR DataFlow集群中的Flink服务,通过CTAS语句将MySQL数据同步至EMR Serverless StarRocks中。

背景信息

您可以通过CTAS或CDAS语句将MySQL数据同步至EMR Serverless StarRocks,CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步。本文使用CTAS语句,CDAS语句的使用方法与CTAS类似,具体请参见CDAS介绍

通过CTAS(CREATE TABLE AS)语句,您可以在StarRocks中自动创建和MySQL中表结构一致的表,并进行数据同步。同时还能实时同步上游表结构(Schema)的变更到下游表,提高您在目标存储中创建表和维护源表结构变更的效率。

当执行CTAS语句时,Flink会按照以下流程执行:

  1. 检查目标存储中是否存在该目标表。

    • 如果不存在,则通过目标端Catalog在目标存储中创建相应的目标表,该目标表具有和数据源相同的Schema。

    • 如果存在,则跳过建表。如果已存在的目标表与源表Schema不一致,则会报错提示。

  2. 提交和启动相应的数据同步作业。同步数据源的数据以及Schema的变更到目标表中。

表结构变更同步策略通过CTAS语句,在实时同步数据的同时,还能同步源表Schema的变更到目标表中。

Schema变更包括初始表的创建以及未来表的变更。

  • 当前支持同步的Schema变更:

    • 添加可空列:会自动在目标表Schema末尾添加对应的列,并自动同步新增列的数据。

    • 删除可空列:不会直接在目标表中删除该列,而是将该列的数据自动填充为NULL值。

    • 重命名列:直接在目标表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。

      例如,如果col_a重命名为col_b,则会在目标表末尾添加col_b,并自动将col_a的数据填充为NULL值。

  • 暂不支持同步的Schema变更:

    • 数据类型的变更。

      例如,由VARCHAR变为BIGINT,由NOT NULL变为NULLABLE属性。

    • 主键或索引等约束的变更。

    • 非空列的增加或删除的变更。

说明
  • 如果遇到不支持的Schema变更,则需要您手动删除下游目标表,重新启动CTAS作业,即重新创建目标表并重新同步历史数据。
  • CTAS不会识别具体的DDL类型,而是对比前后两条数据的Schema差异。因此,如果您先删除了某列后,又加回了该列,且这两个DDL之间无数据变化,则CTAS会认为没有发生结构变更。同理,如果您添加了一列,直到该表有数据变化,CTAS才会感知到结构变更,才会同步结构变更到目标表。
  • 通过CTAS建表支持的字段类型信息,请参见Flink与StarRocks的数据类型映射关系

前提条件

说明

本文以5.7版本的MySQL、EMR-3.42.0版本的DataFlow集群为例介绍。

使用限制

  • DataFlow集群、EMR Serverless StarRocks实例和RDS MySQL实例需要在同一个VPC下。

  • DataFlow集群和EMR Serverless StarRocks实例均须开启公网访问。

  • RDS MySQL须为5.7及以上版本。

  • DataFlow集群须为EMR-3.42.0及后续版本或EMR-5.8.0及后续版本。

步骤一:准备测试数据

  1. 创建测试的数据库和账号,详情请参见创建数据库和账号

    创建完数据库和账号后,需要授权测试账号的读写权限。

    说明

    本文创建的数据库名称为test_cdc,账号为emr_test。

  2. 使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL

  3. 执行以下命令,创建数据表。

    use test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. 登录并连接EMR Serverless StarRocks实例,详情请参见通过客户端方式连接StarRocks实例

  5. 执行以下命令,创建数据库test_cdc、创建超级管理员用户test(示例密码为1qaz!QAZ)或者创建普通用户test并给普通用户授予该数据库权限,详情请参见管理用户

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED by '1qaz!QAZ';
    GRANT ALL on test_cdc to test;

步骤二:上传自定义Connector

上传自定义的Connector用于Flink、StarRocks和RDS MySQL连接。

  1. 使用SSH方式登录DataFlow集群,详情请参见登录集群

  2. 下载flink-connector-starrocks-1.2.2_flink-1.13_2.11.jarververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar,并上传到DataFlow集群的/opt/apps/FLINK/flink-current/lib目录下。

步骤三:执行CTAS操作

  1. 通过Session模式提交作业。

    1. 使用SSH方式登录DataFlow集群,详情请参见登录集群

    2. 执行以下命令,进入/opt/apps/FLINK/flink-current目录。

      cd /opt/apps/FLINK/flink-current
    3. 执行以下命令,启动YARN Session。

      ./bin/yarn-session.sh --detached

      执行成功后,返回信息中的application_XXXX_YY,即为登录SQL客户端需要用到的sessionId。sessionid

    4. 执行以下命令,打开SQL客户端。

      ./bin/sql-client.sh -s <application_XXXX_YY>
      说明

      请修改<application_XXXX_YY>为您前一步获取到的sessionId。

  2. 创建MySQL和StarRocks的Catalog。

    CREATE CATALOG sr WITH (
      'type' = 'starrocks',
      'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'username' = 'test',
      'password' = '1qaz!QAZ',
      'dbname' = 'test_cdc'
    );
    
    CREATE CATALOG mysql WITH (
      'type' = 'mysql',
      'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = '123456',
      'default-database' = 'test_cdc'
    );

    请根据实际信息修改各参数值,各参数描述如下表所示。

    表 1. StarRocks Catalog参数

    参数

    描述

    type

    类型,固定值为starrocks。

    endpoint

    指定FE节点的内网地址和查询端口,格式为EMR Serverless StarRocks实例FE节点的内网地址:9030。例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。

    说明

    关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情

    username

    StarRocks的用户名。

    填写步骤一:准备测试数据中创建的用户名。本示例为test。

    password

    StarRocks数据库服务的密码。

    填写步骤一:准备测试数据中账号设置的密码。本示例为1qaz!QAZ。

    dbname

    StarRocks数据库名称。

    填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

    表 2. MySQL Catalog参数

    参数

    描述

    type

    类型,固定值为mysql。

    hostname

    RDS的内网地址。

    您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    port

    MySQL数据库服务的端口号,默认值为3306。

    username

    MySQL数据库服务的用户名。

    填写步骤一:准备测试数据中账号的用户名。本示例为emr_test。

    password

    MySQL数据库服务的密码。

    填写步骤一:准备测试数据中账号的密码。本示例为123456。

    default-database

    默认的MySQL数据库名称。

    填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

  3. 在StarRocks的Catalog下,发送CTAS语句。

    您可以使用以下三种示例发送CTAS语句。

    • At Least Once语义:通过sink.buffer-flush.interval-ms配置项,配置每次写入StarRocks的时间间隔,优点是写入间隔时间短,占用内存较少。

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
    • Exactly once语义:需要定义checkpoint间隔,优点是在各种异常情况下保障数据不丢失不重复,缺点是数据可见时间取决于checkpoint间隔。更多信息,请参见Checkpointing

      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS ( 'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • Simple模式:优点是创建表时不需要关注原表有哪些字段,会按照MySQL的表格式照搬过来,开发者使用比较方便。缺点是不能创建分区,对于需要分区的表,仍需要通过normal模式创建。

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'='buckets 8',
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr_test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      

      表 3. WITH参数

      参数

      是否必选

      描述

      starrocks.create.table.properties

      StarRocks建表语句中除了字段定义以外的其他后缀定义,例如示例中的engine、key和buckets等。

      database-name

      StarRocks数据库名称。

      本示例为test_cdc。

      jdbc-url

      用于在StarRocks中执行查询操作。

      例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com为EMR Serverless StarRocks实例FE节点的内网地址。

      说明

      关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情

      load-url

      指定FE节点的内网地址和查询端口,格式为EMR Serverless StarRocks实例FE节点的内网地址:8030

      例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

      说明

      关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情

      sink.semantic

      填写exactly-once可以保障数据一致性语义,默认为at-least-once。

      starrocks.create.table.mode

      支持以下参数值:

      • normal模式(默认值):必须像示例一样在starrocks.create.table.properties配置中填写engine、key和buckets等完整的配置。

      • simple模式:默认选择engine为olap,选择key类型为primary key,且主键与MySQL的主键保持完全一致,默认distributed by hash(所有的主键),默认无分区。需要在starrocks.create.table.properties配置中填写的必填内容为buckets ,选填内容为properties等配置。

      sink.properties.row_delimiter

      自定义行分隔符。

      sink.properties.column_separator

      自定义列分隔符。

      说明
      • 因为vvr-6.0.5-flink-1.15及以上版本移除了sink.use.new-api,所以使用vvr-6.0.5-flink-1.15之前的版本时,请在with参数中添加'sink.use.new-api'='false',

      • 其他配置请参见从Apache Flink持续导入

      表 4. OPTIONS参数

      参数

      描述

      connector

      类型,固定值为mysql-cdc。

      hostname

      RDS的内网地址。

      您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

      port

      MySQL数据库服务的端口号,默认值为3306。

      username

      MySQL数据库服务的用户名。

      填写步骤一:准备测试数据中账号的用户名。本示例为emr_test。

      password

      MySQL数据库服务的密码。

      填写步骤一:准备测试数据中账号的密码。

      table-name

      StarRocks中的表名称。

      填写步骤一:准备测试数据中创建的表名。本示例为runoob_tbl。

      database-name

      默认的MySQL数据库名称。

      填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

步骤四:验证数据同步结果

说明

如果开启了checkpoint,则最长等待时间大约是checkpoint的时间间隔。

查询数据

  1. 登录并连接EMR Serverless StarRocks实例,详情请参见通过客户端方式连接StarRocks实例

  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    use test_cdc;
    select * from runoob_tbl1;

    返回信息如下,表示MySQL上的数据已同步至StarRocks。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

查询插入后的数据

  1. 在RDS数据库窗口执行以下命令,插入数据。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1);
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已成功插入。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步数据更新

  1. 在RDS数据库窗口执行以下命令,更新指定数据。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18;
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已同步更新。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步数据删除

  1. 在RDS数据库窗口执行以下命令,删除指定数据。

    DELETE FROM runoob_tbl WHERE runoob_id = 1;
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已同步删除。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

增加可空列

  1. 在RDS数据库窗口执行以下命令,增加可空列。

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. 执行以下命令 ,插入数据。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示Schema已经成功变更。

    +-----------+--------------+---------------+-----------------+---------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 |
    +-----------+--------------+---------------+-----------------+---------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |       2 |
    |        18 | new          | tom           | 2022-06-22      |       3 |    NULL |
    +-----------+--------------+---------------+-----------------+---------+---------+

CDAS介绍

CDAS是CTAS的一个语法糖。通过CDAS语句,可以实现MySQL中的整库同步,即生成一个Flink Job,源表是MySQL中的Database,目标表是StarRocks中对应的多张表,同时可以使用including table语法,只选择一个Database中的部分表进行CDAS操作。

与CTAS的执行相同,需要在创建MySQL和StarRocks相应的Catalog后,执行CDAS语句。创建语法示例如下。

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username'='test',
'password' = '1qaz!QAZ',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
 as DATABASEmysql.test_cdc including table
 'tabl1','tbl2','tbl3'   /*+ OPTIONS (   'connector' = 'mysql-cdc',
  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'test',
  'password' = '123456',
  'database-name' = 'test_cdc' )*/;