使用阿里云实时计算Flink实现MySQL至StarRocks的数据同步

本文为您介绍如何使用阿里云实时计算Flink的VVP平台同步MySQL数据到E-MapReduce的StarRocks。

前提条件

使用限制

  • RDS MySQL须为5.7及以上版本。

  • 创建的VVP集群、StarRocks集群以及RDS MySQL实例需要在同一个VPC下,并且在同一个可用区下。

  • StarRocks集群须为EMR-3.42.0及以上版本。

  • Flink的引擎须为vvr-4.0.11-flink-1.13及以上版本。

注意事项

如果RDS的表有修改(ALTER TABLE),则MySQL修改后的Schema变更需要在StarRocks手动同步。如果RDS的表有新建,则MySQL新加的表需要重新运行StarRocks Migrate Tool以进行数据同步。

操作流程

  1. 步骤一:准备测试数据

  2. 步骤二:通过VVP创建自定义Connector

  3. 步骤三:通过VVP创建MySQL的Catalog

  4. 步骤四:通过VVP创建StarRocks结果表

  5. 步骤五:通过VVP启动作业

  6. 步骤六:场景演示

步骤一:准备测试数据

  1. 创建测试的数据库和账号,详情请参见创建数据库和账号

    创建完数据库和账号后,需要授权测试账号的读写权限。

    说明

    本文创建的数据库名称为test_cdc,账号为emr_test。

  2. 使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL

  3. 执行以下命令,创建数据表。

    /*
       MySQL建表语句
    */
    CREATE TABLE test_cdc.`runoob_tbl` (
      `runoob_id` int unsigned NOT NULL AUTO_INCREMENT,
      `runoob_title` varchar(100) NOT NULL,
      `runoob_author` varchar(40) NOT NULL,
      `submission_date` date DEFAULT NULL,
      `add_col` int DEFAULT NULL,
      PRIMARY KEY (`runoob_id`)
    ) ENGINE=InnoDB
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. 在RDS控制台的数据安全性页面设置Flink网段的白名单,详情请参见通过客户端、命令行连接RDS MySQL实例中的步骤2。

    您可以在实时计算管理控制台,单击目标工作空间操作列下的更多 > 工作空间详情查看Flink网段。

  5. 使用SSH方式登录StarRocks集群,详情请参见登录集群

  6. 执行以下,连接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  7. 执行以下命令,创建用户、授权和建表。

    /*
       StarRocks建表语句
    */
    CREATE USER 'test' IDENTIFIED by '123456';
    
    CREATE DATABASE test_cdc;
    
    GRANT ALL on test_cdc to test;
    
    use test_cdc;
    
    CREATE TABLE `runoob_tbl1` (
      `runoob_id` bigint(20) NOT NULL COMMENT "",
      `runoob_title` varchar(100) NOT NULL COMMENT "",
      `runoob_author` varchar(40) NOT NULL COMMENT "",
      `submission_date` date NULL COMMENT "",
      `add_col` int(11) NULL COMMENT ""
    ) ENGINE=OLAP
    PRIMARY KEY(`runoob_id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`runoob_id`) BUCKETS 8;

步骤二:通过VVP创建自定义Connector

重要

vvr-6.0.3-flink-1.15及以上版本可以直接跳过此步骤,使用内置的StarRocks-Connector。

  1. 登录实时计算管理控制台

  2. 在实时计算控制台,单击目标工作空间操作列下的控制台。

  3. 在左侧导航栏,选择应用 > 作业开发

  4. 创建Connector。

    1. 作业开发页面,单击Connectors页签。

    2. 选择引擎版本。

      重要

      引擎须为vvr-4.0.11-flink-1.13及以上版本。

    3. 单击Connectors所在行的add图标。

    4. 创建Connector对话框中,选择flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar文件,单击继续

    5. Formats下拉列表中选择jsoncsv,单击完成

      其余参数使用默认值即可。创建完成后自定义的Connector会出现在Connectors列表中。

步骤三:通过VVP创建MySQL的Catalog

  1. 在实时计算控制台的作业开发页面,单击新建

  2. 在新建文件对话框中,输入文件名称文件类型使用默认的SQL类型,单击确认

  3. 在文本编辑区域,输入配置MySQL Catalog的命令。

    CREATE CATALOG mysql WITH (
      'type' = 'mysql',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = '******',
      'default-database' = 'test_cdc'
    );

    参数

    说明

    type

    类型,固定值为mysql。

    hostname

    RDS的内网地址。您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。

    port

    MySQL数据库服务的端口号,默认值为3306。

    username

    MySQL数据库服务的用户名。

    填写步骤一:准备测试数据中账号的用户名。本示例为emr_test。

    password

    MySQL数据库服务的密码。

    填写步骤一:准备测试数据中账号的密码。

    default-database

    默认的MySQL数据库名称。

    填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

  4. 单击验证,进行语法检查。

  5. 验证通过后,单击上方的执行

    执行完会提示Query has been executed。如果执行失败,请仔细检查各参数是否填写正确。

  6. 在左侧,单击Schemas页签。

  7. 单击refresh图标,刷新查看新建的MySQL Catalog。

步骤四:通过VVP创建StarRocks结果表

  1. 在实时计算控制台的作业开发页面,单击新建

  2. 在新建文件对话框中,输入文件名称文件类型使用默认的SQL类型,单击确认

  3. 拷贝以下作业代码到作业文本编辑区。

    CREATE TEMPORARY TABLE sr_result (
        runoob_id BIGINT,
        runoob_title VARCHAR,
        runoob_author VARCHAr,
        submission_date date,
        add_col int,
        PRIMARY KEY (runoob_id) NOT ENFORCED
      )
    with (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030',
        'load-url' = '192.168.**.**:8030',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl1',
        'username' = 'emr_test',
        'password' = '******',
        'sink.buffer-flush.interval-ms' = '5000',
        'sink.properties.row_delimiter' = '\x02',
        'sink.properties.column_separator' = '\x01'
      );
    
    INSERT INTO sr_result
    SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col
    from
      mysql.test_cdc.`runoob_tbl`;

    参数

    说明

    connector

    固定值为starrocks。

    jdbc-url

    用于在StarRocks中执行查询操作。

    例如,jdbc:mysql://10.0.**.**:9030。其中,10.0.**.**为StarRocks集群的内网IP地址。

    load-url

    指定FE的IP地址和HTTP端口,格式为StarRocks集群的内网IP地址:端口。本文以8030端口为例,实际请根据您的集群版本选择访问的端口:

    • 18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。

    • 8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。

    说明

    访问端口详情,请参见UI和端口

    datdatabase-name

    StarRocks中的数据库名称。

    填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

    table-name

    StarRocks中的表名称。

    填写步骤一:准备测试数据中创建的表名。本示例为runoob_tbl1。

    username

    StarRocks的用户名。

    填写步骤一:准备测试数据中创建的用户名。本示例为test。

    password

    StarRocks的密码。

    填写步骤一:准备测试数据中设置的密码。本示例为123456。

    sink.buffer-flush.interval-ms

    Buffer刷新时间间隔,取值范围为1000 ms~3600000 ms。

    sink.properties.row_delimiter

    自定义行分隔符。

    sink.properties.column_separator

    自定义列分隔符。

    其中with选项的详细信息,请参见StarRocks官网的使用flink-connector-starrocks导入至StarRocks

    重要
    • 如果sink.semantic设置为exactly-once,则需要配合checkpoint使用,且checkpoint周期不宜过长(数据只在一个checkpoint周期结束后才可见,checkpoint期间数据会存储在flink内存中)。

    • 默认使用csv格式进行导入,您可以通过指定'sink.properties.row_delimiter' = '\\x02'(此参数自StarRocks-1.15.0 开始支持)与'sink.properties.column_separator' = '\\x01'来自定义行分隔符与列分隔符。

  4. 单击验证,进行语法检查。

  5. 验证通过后,单击上线

步骤五:通过VVP启动作业

  1. 在实时计算控制台的左侧导航栏中,单击作业运维

  2. 作业运维页面,单击目标作业名称操作列中的启动

  3. 在弹出的对话框中,单击启动

    直到状态变为运行中,则代表作业运行正常,您可以导入数据。

步骤六:场景演示

查询数据

  1. 使用SSH方式登录StarRocks集群,详情请参见登录集群

  2. 执行以下,连接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  3. 在StarRocks连接窗口执行以下命令,查看表数据。

    use test_cdc;
    select * from runoob_tbl1;

    返回信息如下,表示MySQL上的数据已同步至StarRocks。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

查询插入后的数据

  1. 在RDS数据库窗口执行以下命令,插入数据。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已成功插入。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步数据更新

  1. 在RDS数据库窗口执行以下命令,更新指定数据。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已同步更新。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步数据删除

  1. 在RDS数据库窗口执行以下命令,删除指定数据。

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. 在StarRocks连接窗口执行以下命令,查看表数据。

    select * from runoob_tbl1;

    返回信息如下,表示数据已同步删除。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Flink与StarRocks数据类型映射关系

Flink数据类型

StarRocks数据类型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

BINARY

INT

CHAR

STRING

VARCHAR

STRING

STRING

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE(N)

DATETIME

TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)

DATETIME

ARRAY\<T>

ARRAY\<T>

MAP\<KT,VT>

JSON STRING

ROW\<arg T...>

JSON STRING

常见问题

  • Q:导入StarRocks的数据存在时区不一致问题该如何处理?

  • A:您可以在Insert into语句中以hint语法增加时区配置来解决该问题,示例如下。

    INSERT INTO sr_result
    SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col
    from
     mysql.test_cdc.`runoob_tbl` /*+
    OPTIONS('server-time-zone'='Asia/Shanghai') */;