本文为您介绍如何使用阿里云实时计算Flink的VVP平台同步MySQL数据到E-MapReduce的StarRocks。
前提条件
已开通阿里云实时计算Flink全托管,详情请参见开通Flink全托管。
已创建StarRocks集群,详情请参见创建StarRocks集群。
说明Core实例数量设置为3。
已创建RDS MySQL,详情请参见创建RDS MySQL实例。
说明本文以MySQL 5.7版本为例介绍。
使用限制
RDS MySQL须为5.7及以上版本。
创建的VVP集群、StarRocks集群以及RDS MySQL实例需要在同一个VPC下,并且在同一个可用区下。
StarRocks集群须为EMR-3.42.0及以上版本。
Flink的引擎须为vvr-4.0.11-flink-1.13及以上版本。
注意事项
如果RDS的表有修改(ALTER TABLE
),则MySQL修改后的Schema变更需要在StarRocks手动同步。如果RDS的表有新建,则MySQL新加的表需要重新运行StarRocks Migrate Tool以进行数据同步。
操作流程
步骤一:准备测试数据
创建测试的数据库和账号,详情请参见创建数据库和账号。
创建完数据库和账号后,需要授权测试账号的读写权限。
说明本文创建的数据库名称为test_cdc,账号为emr_test。
使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL。
执行以下命令,创建数据表。
/* MySQL建表语句 */ CREATE TABLE test_cdc.`runoob_tbl` ( `runoob_id` int unsigned NOT NULL AUTO_INCREMENT, `runoob_title` varchar(100) NOT NULL, `runoob_author` varchar(40) NOT NULL, `submission_date` date DEFAULT NULL, `add_col` int DEFAULT NULL, PRIMARY KEY (`runoob_id`) ) ENGINE=InnoDB INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
在RDS控制台的数据安全性页面设置Flink网段的白名单,详情请参见通过客户端、命令行连接RDS MySQL实例中的步骤2。
您可以在实时计算管理控制台,单击目标工作空间操作列下的
查看Flink网段。使用SSH方式登录StarRocks集群,详情请参见登录集群。
执行以下,连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
执行以下命令,创建用户、授权和建表。
/* StarRocks建表语句 */ CREATE USER 'test' IDENTIFIED by '123456'; CREATE DATABASE test_cdc; GRANT ALL on test_cdc to test; use test_cdc; CREATE TABLE `runoob_tbl1` ( `runoob_id` bigint(20) NOT NULL COMMENT "", `runoob_title` varchar(100) NOT NULL COMMENT "", `runoob_author` varchar(40) NOT NULL COMMENT "", `submission_date` date NULL COMMENT "", `add_col` int(11) NULL COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`runoob_id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`runoob_id`) BUCKETS 8;
步骤二:通过VVP创建自定义Connector
vvr-6.0.3-flink-1.15及以上版本可以直接跳过此步骤,使用内置的StarRocks-Connector。
登录实时计算管理控制台。
在实时计算控制台,单击目标工作空间操作列下的控制台。
在左侧导航栏,选择 。
创建Connector。
在作业开发页面,单击Connectors页签。
选择引擎版本。
重要引擎须为vvr-4.0.11-flink-1.13及以上版本。
单击Connectors所在行的图标。
在创建Connector对话框中,选择flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar文件,单击继续。
在Formats下拉列表中选择json和csv,单击完成。
其余参数使用默认值即可。创建完成后自定义的Connector会出现在Connectors列表中。
步骤三:通过VVP创建MySQL的Catalog
在实时计算控制台的作业开发页面,单击新建。
在新建文件对话框中,输入文件名称,文件类型使用默认的SQL类型,单击确认。
在文本编辑区域,输入配置MySQL Catalog的命令。
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '******', 'default-database' = 'test_cdc' );
参数
说明
type
类型,固定值为mysql。
hostname
RDS的内网地址。您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。
port
MySQL数据库服务的端口号,默认值为3306。
username
MySQL数据库服务的用户名。
填写步骤一:准备测试数据中账号的用户名。本示例为emr_test。
password
MySQL数据库服务的密码。
填写步骤一:准备测试数据中账号的密码。
default-database
默认的MySQL数据库名称。
填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。
单击验证,进行语法检查。
验证通过后,单击上方的执行。
执行完会提示Query has been executed。如果执行失败,请仔细检查各参数是否填写正确。
在左侧,单击Schemas页签。
单击图标,刷新查看新建的MySQL Catalog。
步骤四:通过VVP创建StarRocks结果表
在实时计算控制台的作业开发页面,单击新建。
在新建文件对话框中,输入文件名称,文件类型使用默认的SQL类型,单击确认。
拷贝以下作业代码到作业文本编辑区。
CREATE TEMPORARY TABLE sr_result ( runoob_id BIGINT, runoob_title VARCHAR, runoob_author VARCHAr, submission_date date, add_col int, PRIMARY KEY (runoob_id) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030', 'load-url' = '192.168.**.**:8030', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl1', 'username' = 'emr_test', 'password' = '******', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ); INSERT INTO sr_result SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col from mysql.test_cdc.`runoob_tbl`;
参数
说明
connector
固定值为starrocks。
jdbc-url
用于在StarRocks中执行查询操作。
例如,jdbc:mysql://10.0.**.**:9030。其中,10.0.**.**为StarRocks集群的内网IP地址。
load-url
指定FE的IP地址和HTTP端口,格式为
StarRocks集群的内网IP地址:端口
。本文以8030端口为例,实际请根据您的集群版本选择访问的端口:18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。
8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。
说明访问端口详情,请参见UI和端口。
datdatabase-name
StarRocks中的数据库名称。
填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。
table-name
StarRocks中的表名称。
填写步骤一:准备测试数据中创建的表名。本示例为runoob_tbl1。
username
StarRocks的用户名。
填写步骤一:准备测试数据中创建的用户名。本示例为test。
password
StarRocks的密码。
填写步骤一:准备测试数据中设置的密码。本示例为123456。
sink.buffer-flush.interval-ms
Buffer刷新时间间隔,取值范围为1000 ms~3600000 ms。
sink.properties.row_delimiter
自定义行分隔符。
sink.properties.column_separator
自定义列分隔符。
其中
with
选项的详细信息,请参见StarRocks官网的使用flink-connector-starrocks导入至StarRocks。重要如果sink.semantic设置为exactly-once,则需要配合checkpoint使用,且checkpoint周期不宜过长(数据只在一个checkpoint周期结束后才可见,checkpoint期间数据会存储在flink内存中)。
默认使用csv格式进行导入,您可以通过指定
'sink.properties.row_delimiter' = '\\x02'
(此参数自StarRocks-1.15.0 开始支持)与'sink.properties.column_separator' = '\\x01'
来自定义行分隔符与列分隔符。
单击验证,进行语法检查。
验证通过后,单击上线。
步骤五:通过VVP启动作业
在实时计算控制台的左侧导航栏中,单击作业运维。
在作业运维页面,单击目标作业名称操作列中的启动。
在弹出的对话框中,单击启动。
直到状态变为运行中,则代表作业运行正常,您可以导入数据。
步骤六:场景演示
查询数据
使用SSH方式登录StarRocks集群,详情请参见登录集群。
执行以下,连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
在StarRocks连接窗口执行以下命令,查看表数据。
use test_cdc; select * from runoob_tbl1;
返回信息如下,表示MySQL上的数据已同步至StarRocks。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
查询插入后的数据
在RDS数据库窗口执行以下命令,插入数据。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1)
在StarRocks连接窗口执行以下命令,查看表数据。
select * from runoob_tbl1;
返回信息如下,表示数据已成功插入。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步数据更新
在RDS数据库窗口执行以下命令,更新指定数据。
update runoob_tbl set runoob_title= 'new' where runoob_id = 18
在StarRocks连接窗口执行以下命令,查看表数据。
select * from runoob_tbl1;
返回信息如下,表示数据已同步更新。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步数据删除
在RDS数据库窗口执行以下命令,删除指定数据。
DELETE FROM runoob_tbl WHERE runoob_id = 1
在StarRocks连接窗口执行以下命令,查看表数据。
select * from runoob_tbl1;
返回信息如下,表示数据已同步删除。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Flink与StarRocks数据类型映射关系
Flink数据类型 | StarRocks数据类型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY\<T> | ARRAY\<T> |
MAP\<KT,VT> | JSON STRING |
ROW\<arg T...> | JSON STRING |
常见问题
Q:导入StarRocks的数据存在时区不一致问题该如何处理?
A:您可以在Insert into语句中以hint语法增加时区配置来解决该问题,示例如下。
INSERT INTO sr_result SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col from mysql.test_cdc.`runoob_tbl` /*+ OPTIONS('server-time-zone'='Asia/Shanghai') */;