本文为您介绍如何使用Flink CDCMySQL数据同步至EMR Serverless StarRocks中。

前提条件

说明 本文以5.7版本的MySQL、EMR-3.39.1版本的DataFlow集群为例介绍。

使用限制

  • DataFlow集群、EMR Serverless StarRocks实例和RDS MySQL实例需要在同一个VPC下。
  • DataFlow集群和EMR Serverless StarRocks实例均须开启公网访问。
  • RDS MySQL须为5.7及以上版本。

操作流程

  1. 步骤一:准备测试数据
  2. 步骤二:配置同步工具和启动Flink任务
  3. 步骤三:验证数据同步结果

步骤一:准备测试数据

  1. 创建测试的数据库和账号,详情请参见创建数据库和账号
    创建完数据库和账号后,需要授权测试账号的读写权限。
    说明 本文创建的数据库名称为test_cdc。
  2. 使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL
  3. 执行以下命令,创建数据表。
    CREATE TABLE test_cdc.`t_user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      `age` tinyint(4) DEFAULT NULL,
      `create_time` datetime DEFAULT NULL,
      `update_time` datetime DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

步骤二:配置同步工具和启动Flink任务

  1. 使用SSH方式登录DataFlow集群,详情请参见登录集群
  2. 下载Flink CDC connectorFlink StarRocks Connector
    说明 下载过程需要一定时间,请耐心等待。
  3. 执行以下命令,将下载的Flink CDC ConnectorFlink StarRocks Connector文件复制到DataFlow集群的/opt/apps/FLINK/flink-current/lib目录下。
    cp flink-* /opt/apps/FLINK/flink-current/lib/
  4. 执行以下命令,启动集群。
    重要 本文示例仅供测试,如果是生产级别的Flink作业请使用YARNKubernetes方式提交,详情请参见Apache Hadoop YARNNative Kubernetes
    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. 下载并修改配置文件。
    1. 下载StarRocks Migrate Tool,并上传到DataFlow集群的root目录下。
    2. 执行以下命令,解压缩smt.tar.gz文件。
      tar -zxvf smt.tar.gz && cd smt
    3. 执行以下命令,编辑config_prod.conf文件。
      vim conf/config_prod.conf
      请根据实际信息修改各参数值,各参数描述如下表所示。
      参数描述
      hostRDS的内网地址。

      您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

      port固定值3306。
      userRDS上创建的账号。

      填写步骤一:准备测试数据中账号的用户名。

      passwordRDS上创建账号的密码。

      填写步骤一:准备测试数据中账号的密码。

      be_numEMR Serverless StarRocks实例的BE节点(Backend)个数,如果是最小集群,则直接设置为1。
      database正则表达式用于匹配RDS数据库的名称,表示需要同步到StarRocks的数据库。例如,^test.*$
      table正则表达式用于匹配RDS表的名称,表示需要同步到StarRocks的表。例如,^.*$
      flink.starrocks.jdbc-url用于在StarRocks中执行查询操作。
      例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.comEMR Serverless StarRocks实例FE节点的内网地址。
      说明 关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情
      flink.starrocks.load-url指定FE节点的内网地址和HTTP端口,格式为EMR Serverless StarRocks实例FE节点的内网地址:8030
      例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。
      说明 关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情
      flink.starrocks.usernameStarRocks连接用户名。
      flink.starrocks.passwordStarRocks连接密码。
      说明 默认值为空,可以不填写密码。
      配置文件中的StarRocks相关配置示例如下,其他参数的配置示例请参见更多信息
      flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030
      flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030
      flink.starrocks.username=admin
      flink.starrocks.password=1qaz!QAZ
  6. 执行以下命令,将所有建表语句都生成在result目录下。
    ./starrocks-migrate-tool

    您可以通过ls result命令,查看result下的目录。

    返回信息如下所示。
    flink-create.1.sql  flink-create.all.sql  starrocks-create.1.sql  starrocks-create.all.sql  starrocks-external-create.1.sql  starrocks-external-create.all.sql
    说明
    • 本文示例的配置文件仅定义了table-rule.1等规则,.1.sql格式文件对应的就是table-rule.1的建表语句。如果您的配置文件有table-rule.2等规则,则.all.sql文件为所有规则的集合。
    • external-create后缀的文件,为对应数据源的外表。如果对于部分场景小的维表您不想同步,则可以直接通过外表查询,使用该文件可以生成对应的外表。本文示例未使用。
  7. 执行以下命令,创建StarRocks表。
    mysql -h<EMR Serverless StarRocks实例FE节点的内网地址> -P9030 -uroot -p < result/starrocks-create.1.sql
    说明 如果修改config_prod.conf文件时,没有设置StarRocks连接密码,则直接按回车键。
  8. 执行以下命令,启动Flink任务。
    /opt/apps/FLINK/flink-current/bin/sql-client.sh -f result/flink-create.1.sql

步骤三:验证数据同步结果

查询数据

  1. 登录并连接EMR Serverless StarRocks实例,详情请参见连接StarRocks实例(客户端方式)
  2. 执行以下命令,查看数据库信息。
    show databases;
    返回信息如下所示。
    +--------------------+
    | Database           |
    +--------------------+
    | _statistics_       |
    | information_schema |
    | test_cdc           |
    +--------------------+
    3 rows in set (0.00 sec)
  3. StarRocks连接窗口执行以下命令,查看表数据。
    use test_cdc;
    select * from t_user;

    由于本文示例中t_user表中还没有数据,因此预期返回数据为空。

查询插入后的数据

  1. RDS数据库窗口执行以下命令,插入数据。
    INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.0",30,NOW(),NOW());
    INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.1",31,NOW(),NOW());
    INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.2",32,NOW(),NOW());
  2. StarRocks连接窗口执行以下命令,查看表数据。
    select * from t_user;
    返回信息如下,表示数据已成功插入。
    +------+--------------+------+---------------------+---------------------+
    | id   | name         | age  | create_time         | update_time         |
    +------+--------------+------+---------------------+---------------------+
    | 4    | aliyun.com.0 |   30 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 |
    | 5    | aliyun.com.1 |   31 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 |
    | 6    | aliyun.com.2 |   32 | 2022-03-10 13:22:42 | 2022-03-10 13:22:42 |
    +------+--------------+------+---------------------+---------------------+
    3 rows in set (0.00 sec)

同步数据更新

  1. RDS数据库窗口执行以下命令,更新指定数据。
    UPDATE test_cdc.t_user SET age=35 where name="aliyun.com.0";
  2. StarRocks连接窗口执行以下命令,查看表数据。
    select * from t_user where name = "aliyun.com.0";
    返回信息如下,表示数据已同步更新。
    +------+--------------+------+---------------------+---------------------+
    | id   | name         | age  | create_time         | update_time         |
    +------+--------------+------+---------------------+---------------------+
    | 4    | aliyun.com.0 |   35 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 |
    +------+--------------+------+---------------------+---------------------+
    1 row in set (0.01 sec)

同步数据删除

  1. RDS数据库窗口执行以下命令,删除指定数据。
    DELETE FROM test_cdc.t_user where 1=1;
  2. StarRocks连接窗口执行以下命令,查看表数据。
    select * from t_user;
    返回信息如下,表示数据已同步删除。
    Empty set (0.01 sec)

更多信息

  • 本文档仅供测试使用,生产级别的Flink作业请使用阿里云VVP产品进行配置,或者使用YARN或者Kubernetes提交作业。

    详情请参见Apache Hadoop YARNNative Kubernetes

  • 如果RDS的表有修改(ALTER TABLE),则MySQLalter table之后的Schema变更需要在StarRocks中手动同步。如果RDS的表有新建,则MySQL新建的表需要重新运行StarRocks Migrate Tool以进行数据同步。
  • StarRocks Migrate Tool的配置文件示例。
    [db]
    host = rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com
    port = 3306
    user = ***
    password = ***
    # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`
    type = mysql
    # # only takes effect on `type == hive`.
    # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
    # authentication = kerberos
    
    [other]
    # number of backends in StarRocks
    be_num = 1
    # `decimal_v3` is supported since StarRocks-1.8.1
    use_decimal_v3 = false
    # directory to save the converted DDL SQL
    output_dir = ./result
    
    
    # !!!`database` `table` `schema` are case sensitive in `oracle`!!!
    [table-rule.1]
    # pattern to match databases for setting properties
    # !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!!
    database = ^test.*$
    # pattern to match tables for setting properties
    table = ^.*$
    # `schema` only takes effect on `postgresql` and `oracle` and `sqlserver`
    schema = ^.*$
    
    ############################################
    ### starrocks table configurations
    ############################################
    # # set a column as the partition_key
    # partition_key = p_key
    # # override the auto-generated partitions
    # partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day)
    # # only take effect on tables without primary keys or unique indexes
    # duplicate_keys=k1,k2
    # # override the auto-generated distributed keys
    # distributed_by=k1,k2
    # # override the auto-generated distributed buckets
    # bucket_num=32
    # # properties.xxxxx: properties used to create tables
    # properties.in_memory = false
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030
    flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030
    flink.starrocks.username=admin
    flink.starrocks.password=1qaz!QAZ
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
    # # used to set the server-id for mysql-cdc jobs instead of using a random server-id
    # flink.cdc.server-id = 5000
    ############################################
    ### flink-cdc plugin configuration for `postgresql`
    ############################################
    # # for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
    # # refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
    # # and https://debezium.io/documentation/reference/postgres-plugins.html
    # flink.cdc.decoding.plugin.name = decoderbufs