本文为您介绍如何使用Flink CDC将MySQL数据同步至EMR Serverless StarRocks中。
前提条件
- 已在新版控制台创建DataFlow集群,详情请参见创建集群。
- 已创建EMR Serverless StarRocks实例,详情请参见创建实例。
- 已创建RDS MySQL,详情请参见创建RDS MySQL实例。
说明 本文以5.7版本的MySQL、EMR-3.39.1版本的DataFlow集群为例介绍。
使用限制
- DataFlow集群、EMR Serverless StarRocks实例和RDS MySQL实例需要在同一个VPC下。
- DataFlow集群和EMR Serverless StarRocks实例均须开启公网访问。
- RDS MySQL须为5.7及以上版本。
操作流程
步骤一:准备测试数据
- 创建测试的数据库和账号,详情请参见创建数据库和账号。创建完数据库和账号后,需要授权测试账号的读写权限。说明 本文创建的数据库名称为test_cdc。
- 使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL。
- 执行以下命令,创建数据表。
CREATE TABLE test_cdc.`t_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` tinyint(4) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
步骤二:配置同步工具和启动Flink任务
- 使用SSH方式登录DataFlow集群,详情请参见登录集群。
- 下载Flink CDC connector和Flink StarRocks Connector。说明 下载过程需要一定时间,请耐心等待。
- 执行以下命令,将下载的Flink CDC Connector和Flink StarRocks Connector文件复制到DataFlow集群的/opt/apps/FLINK/flink-current/lib目录下。
cp flink-* /opt/apps/FLINK/flink-current/lib/
- 执行以下命令,启动集群。重要 本文示例仅供测试,如果是生产级别的Flink作业请使用YARN或Kubernetes方式提交,详情请参见Apache Hadoop YARN和Native Kubernetes。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
- 下载并修改配置文件。
- 执行以下命令,将所有建表语句都生成在result目录下。
./starrocks-migrate-tool
您可以通过
ls result
命令,查看result下的目录。返回信息如下所示。flink-create.1.sql flink-create.all.sql starrocks-create.1.sql starrocks-create.all.sql starrocks-external-create.1.sql starrocks-external-create.all.sql
说明- 本文示例的配置文件仅定义了table-rule.1等规则,.1.sql格式文件对应的就是table-rule.1的建表语句。如果您的配置文件有table-rule.2等规则,则.all.sql的文件为所有规则的集合。
external-create
后缀的文件,为对应数据源的外表。如果对于部分场景小的维表您不想同步,则可以直接通过外表查询,使用该文件可以生成对应的外表。本文示例未使用。
- 执行以下命令,创建StarRocks表。
mysql -h<EMR Serverless StarRocks实例FE节点的内网地址> -P9030 -uroot -p < result/starrocks-create.1.sql
说明 如果修改config_prod.conf文件时,没有设置StarRocks连接密码,则直接按回车键。 - 执行以下命令,启动Flink任务。
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f result/flink-create.1.sql
步骤三:验证数据同步结果
查询数据
- 登录并连接EMR Serverless StarRocks实例,详情请参见连接StarRocks实例(客户端方式)。
- 执行以下命令,查看数据库信息。
show databases;
返回信息如下所示。+--------------------+ | Database | +--------------------+ | _statistics_ | | information_schema | | test_cdc | +--------------------+ 3 rows in set (0.00 sec)
- 在StarRocks连接窗口执行以下命令,查看表数据。
use test_cdc; select * from t_user;
由于本文示例中t_user表中还没有数据,因此预期返回数据为空。
查询插入后的数据
- 在RDS数据库窗口执行以下命令,插入数据。
INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.0",30,NOW(),NOW()); INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.1",31,NOW(),NOW()); INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.2",32,NOW(),NOW());
- 在StarRocks连接窗口执行以下命令,查看表数据。
select * from t_user;
返回信息如下,表示数据已成功插入。+------+--------------+------+---------------------+---------------------+ | id | name | age | create_time | update_time | +------+--------------+------+---------------------+---------------------+ | 4 | aliyun.com.0 | 30 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | | 5 | aliyun.com.1 | 31 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | | 6 | aliyun.com.2 | 32 | 2022-03-10 13:22:42 | 2022-03-10 13:22:42 | +------+--------------+------+---------------------+---------------------+ 3 rows in set (0.00 sec)
同步数据更新
- 在RDS数据库窗口执行以下命令,更新指定数据。
UPDATE test_cdc.t_user SET age=35 where name="aliyun.com.0";
- 在StarRocks连接窗口执行以下命令,查看表数据。
select * from t_user where name = "aliyun.com.0";
返回信息如下,表示数据已同步更新。+------+--------------+------+---------------------+---------------------+ | id | name | age | create_time | update_time | +------+--------------+------+---------------------+---------------------+ | 4 | aliyun.com.0 | 35 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | +------+--------------+------+---------------------+---------------------+ 1 row in set (0.01 sec)
同步数据删除
- 在RDS数据库窗口执行以下命令,删除指定数据。
DELETE FROM test_cdc.t_user where 1=1;
- 在StarRocks连接窗口执行以下命令,查看表数据。
select * from t_user;
返回信息如下,表示数据已同步删除。Empty set (0.01 sec)
更多信息
- 本文档仅供测试使用,生产级别的Flink作业请使用阿里云VVP产品进行配置,或者使用YARN或者Kubernetes提交作业。
- 如果RDS的表有修改(
ALTER TABLE
),则MySQL中alter table
之后的Schema变更需要在StarRocks中手动同步。如果RDS的表有新建,则MySQL新建的表需要重新运行StarRocks Migrate Tool以进行数据同步。 - StarRocks Migrate Tool的配置文件示例。
[db] host = rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com port = 3306 user = *** password = *** # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse` type = mysql # # only takes effect on `type == hive`. # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap # authentication = kerberos [other] # number of backends in StarRocks be_num = 1 # `decimal_v3` is supported since StarRocks-1.8.1 use_decimal_v3 = false # directory to save the converted DDL SQL output_dir = ./result # !!!`database` `table` `schema` are case sensitive in `oracle`!!! [table-rule.1] # pattern to match databases for setting properties # !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!! database = ^test.*$ # pattern to match tables for setting properties table = ^.*$ # `schema` only takes effect on `postgresql` and `oracle` and `sqlserver` schema = ^.*$ ############################################ ### starrocks table configurations ############################################ # # set a column as the partition_key # partition_key = p_key # # override the auto-generated partitions # partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day) # # only take effect on tables without primary keys or unique indexes # duplicate_keys=k1,k2 # # override the auto-generated distributed keys # distributed_by=k1,k2 # # override the auto-generated distributed buckets # bucket_num=32 # # properties.xxxxx: properties used to create tables # properties.in_memory = false ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030 flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030 flink.starrocks.username=admin flink.starrocks.password=1qaz!QAZ flink.starrocks.sink.max-retries=10 flink.starrocks.sink.buffer-flush.interval-ms=15000 flink.starrocks.sink.properties.format=json flink.starrocks.sink.properties.strip_outer_array=true # # used to set the server-id for mysql-cdc jobs instead of using a random server-id # flink.cdc.server-id = 5000 ############################################ ### flink-cdc plugin configuration for `postgresql` ############################################ # # for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming # # refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html # # and https://debezium.io/documentation/reference/postgres-plugins.html # flink.cdc.decoding.plugin.name = decoderbufs