CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步。本文为您介绍如何使用实时计算Flink平台和E-MapReduce StarRocks通过CTAS&CDAS功能实现实时数仓中TP(Transaction Processing)和AP(Analytical Processing)数据同步的场景。
背景信息
通过CTAS(CREATE TABLE AS)语句可以在StarRocks中自动创建和MySQL中表结构一致的表,并进行数据同步,还能实时同步上游表结构(Schema)的变更到下游表,提高您在目标存储中创建表和维护源表结构变更的效率。
当执行CTAS语句时,Flink会按照以下流程执行:
检查目标存储中是否存在该目标表。
如果不存在,则通过目标端Catalog在目标存储中创建相应的目标表,该目标表具有和数据源相同的Schema。
如果存在,则跳过建表。如果已存在的目标表与源表Schema不一致,则会报错提示。
提交和启动相应的数据同步作业。同步数据源的数据以及Schema的变更到目标表中。
表结构变更同步策略通过CTAS语句,在实时同步数据的同时,还能同步源表Schema的变更到目标表中。
Schema变更包括初始表的创建以及未来表的变更。
当前支持同步的Schema变更:
添加可空列:会自动在目标表Schema末尾添加对应的列,并自动同步新增列的数据。
删除可空列:不会直接在目标表中删除该列,而是将该列的数据自动填充为NULL值。
重命名列:被视为添加列和删除列。直接在目标表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。
例如,如果col_a重命名为col_b,则会在目标表末尾添加col_b,并自动将col_a的数据填充为NULL值。
暂不支持同步的Schema变更:
数据类型的变更。
例如,由VARCHAR变为BIGINT,由NOT NULL变为NULLABLE属性。
主键或索引等约束的变更。
非空列的增加或删除的变更。
DDL中字段长度的调整。
如果遇到不支持的Schema变更,则需要您手动删除下游目标表,重新启动CTAS作业,即重新创建目标表并重新同步历史数据。
CTAS不会识别具体的DDL类型,而是对比前后两条数据的Schema差异。因此,如果您先删除了某列后,又加回了该列,且这两个DDL之间无数据变化,则CTAS会认为没有发生结构变更。同理,如果您添加了一列,直到该表有数据变化,CTAS才会感知到结构变更,才会同步结构变更到目标表。
通过CTAS建表支持的字段类型信息,请参见Flink与StarRocks的数据类型映射关系。
在使用CTAS语句合并MySQL多张表时,默认情况下,系统会自动在生成的新表结构最前面添加
_db_name
和_table_name
两列,用来追踪源数据表信息。由于这一自动添加行为不可更改,您在定义新表的列顺序时,请直接从第三列开始定义您期望的列顺序,以确保新表结构符合预期。
前提条件
已开通阿里云实时计算Flink全托管并创建了Flink集群,详情请参见开通Flink全托管和Flink SQL作业快速入门。
已创建StarRocks集群,详情请参见创建StarRocks集群。
已创建RDS MySQL,详情请参见创建RDS MySQL实例。
本文以5.7版本的MySQL、EMR-3.39.1版本的StarRocks集群和1.15-6.0.3版本的Flink为例介绍。
使用限制
创建的Flink集群、StarRocks集群以及RDS MySQL实例需要在同一个VPC下。
RDS MySQL须为5.7及以上版本。
StarRocks须开启公网访问。
Flink集群中的Flink须为1.15-vvr-6.0.3及以上版本。
步骤一:准备测试数据
创建测试的数据库和账号,详情请参见创建数据库和账号。
创建完数据库和账号后,需要授权测试账号的读写权限。
说明本文创建的数据库名称为test_cdc,账号为test。
使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL。
在MySQL中执行以下命令,创建数据表。
use test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
使用SSH方式登录StarRocks集群,详情请参见登录集群。
执行以下,连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
执行以下命令,创建用户和授权。
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED by '123456'; GRANT CREATE TABLE ON DATABASE test_cdc TO test;
步骤二:在实时计算Flink控制台通过SQL客户端创建Catalog
在阿里云实时计算Flink控制台的作业开发页面中,创建MySQL和StarRocks的Catalog。详情请参见Flink SQL作业快速入门。
参数仅供参考格式,具体内容请根据实际情况配置。
MySQL Catalog
代码示例
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'default-database' = 'test_cdc' );
参数配置
参数
说明
type
类型,固定值为mysql。
hostname
RDS的内网地址。您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com。
port
MySQL数据库服务的端口号,默认值为3306。
username
MySQL数据库服务的用户名。
填写步骤一:准备测试数据中账号的用户名。本示例为test。
password
MySQL数据库服务的密码。
填写步骤一:准备测试数据中账号的密码。
default-database
默认的MySQL数据库名称。
填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。
StarRocks Catalog
代码示例
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = '172.16.**.**:9030', 'username' = 'test', 'password' = '123456', 'dbname' = 'test_cdc' );
参数配置
参数
说明
type
类型,固定值为starrocks。
endpoint
StarRocks FE的IP地址和端口。
username
StarRocks的用户名。
填写步骤一:准备测试数据中账号的用户名。本示例为test。
password
StarRocks数据库服务的密码。
填写步骤一:准备测试数据中账号的密码。
dbname
StarRocks数据库名称。
填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。
步骤三:创建并上线作业
在阿里云实时计算Flink控制台的作业开发页面,编写CTAS语句。
您可以使用以下三种示例发送CTAS语句。
AtLeast once语义:通过sink.buffer-flush.interval-ms配置项,配置每次写入StarRocks的时间间隔,优点是写入间隔时间短,占用内存较少。
/* AtLeast once 语义 */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl_sr with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '123456', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
Exactly once语义:需要定义checkpoint间隔,优点是在各种异常情况下保障数据不丢失不重复,缺点是数据可见时间取决于checkpoint间隔。更多信息,请参见Checkpointing。
/* Exactly once 语义。 */ set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '123456', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01 ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
Simple模式:优点是创建表时不需要关注原表有哪些字段,会按照MySQL的表格式照搬过来,开发者使用比较方便。缺点是不能创建分区,对于需要分区的表,仍需要通过normal模式创建。
/* 上面两个为normal模式,本示例演示simple模式 */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'='buckets 8', 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '123456', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
表 1. WITH参数
参数
是否必选
描述
starrocks.create.table.properties
是
StarRocks建表语句中除了字段定义以外的其他后缀定义,例如示例中的engine、key和buckets等。
database-name
是
StarRocks数据库名称。
本示例为test_cdc。
jdbc-url
是
用于在StarRocks中执行查询操作。
例如,jdbc:mysql://172.16.**.**:9030。其中,
172.16.**.**
为StarRocks集群的内网IP地址。load-url
是
指定FE的IP地址和HTTP端口,格式为
StarRocks集群的内网IP地址:端口
。本文以8030端口为例,实际请根据您的集群版本选择访问的端口:18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。
8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。
说明访问端口详情,请参见UI和端口。
sink.semantic
否
填写exactly-once可以保障数据一致性语义,默认为at-least-once。
starrocks.create.table.mode
否
支持以下参数值:
normal模式(默认值):必须像示例一样在starrocks.create.table.properties配置中填写engine、key和buckets等完整的配置。
simple模式:默认选择engine为olap,选择key类型为primary key,且主键与MySQL的主键保持完全一致,默认distributed by hash(所有的主键),默认无分区。需要在starrocks.create.table.properties配置中填写的必填内容为buckets ,选填内容为properties等配置。
说明因为vvr-6.0.5-flink-1.15及以上版本移除了
sink.use.new-api
,所以使用vvr-6.0.5-flink-1.15之前的版本时,请在with参数中添加'sink.use.new-api'='false',
。其他配置请参见从Apache Flink持续导入。
表 2. OPTIONS参数
参数
描述
connector
类型,固定值为mysql-cdc。
hostname
RDS的内网地址。
您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。
port
MySQL数据库服务的端口号,默认值为3306。
username
MySQL数据库服务的用户名。
填写步骤一:准备测试数据中账号的用户名。本示例为test。
password
MySQL数据库服务的密码。
填写步骤一:准备测试数据中账号的密码。
table-name
StarRocks中的表名称。
填写步骤一:准备测试数据中创建的表名。本示例为runoob_tbl。
database-name
默认的MySQL数据库名称。
填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。
在作业开发页面的高级配置中,选择vvr-6.0.3及以上的版本。
单击上线。
在作业运维页面,单击目标作业操作列的启动。
步骤四:场景演示
查询数据
使用SSH方式登录StarRocks集群,详情请参见登录集群。
执行以下,连接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
在StarRocks连接窗口执行以下命令,查看表数据。
use test_cdc; select * from runoob_tbl1;
返回信息如下,表示MySQL上的数据已同步至StarRocks。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
查询插入后的数据
在RDS数据库窗口执行以下命令,插入数据。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1)
在StarRocks连接窗口执行以下命令,查看表数据。
select * from runoob_tbl1;
返回信息如下,表示数据已成功插入。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步数据更新
在RDS数据库窗口执行以下命令,更新指定数据。
update runoob_tbl set runoob_title= 'new' where runoob_id = 18
在StarRocks连接窗口执行以下命令,查看表数据。
select * from runoob_tbl1;
返回信息如下,表示数据已同步更新。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步数据删除
在RDS数据库窗口执行以下命令,删除指定数据。
DELETE FROM runoob_tbl WHERE runoob_id = 1
在StarRocks连接窗口执行以下命令,查看表数据。
select * from runoob_tbl1;
返回信息如下,表示数据已同步删除。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
增加可空列
在RDS数据库窗口执行以下命令,增加可空列。
alter table `runoob_tbl` add COLUMN `add_col2` INT;
执行以下命令 ,插入数据。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)
在StarRocks连接窗口执行以下命令,查看表数据。
select * from runoob_tbl1;
返回信息如下,表示Schema已经成功变更。
+-----------+--------------+---------------+-----------------+---------+----------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | +-----------+--------------+---------------+-----------------+---------+----------+ | 18 | new | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+ | 1 | second | tom2 | 2022-06-23 | 1 | 2 | | 18 | first | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+
CDAS介绍
CDAS是CTAS的一个语法糖。通过CDAS语句,可以实现MySQL中的整库同步,即生成一个Flink Job。Source是MySQL中的database,目标表是StarRocks中对应的多张表,同时可以使用including table语法,只选择一个database中的部分表进行CDAS操作。
与CTAS的执行相同,需要在创建MySQL和StarRocks相应的Catalog后,执行CDAS语句。创建语法示例如下。
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:18030',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000' ,
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table 'tabl1','tbl2','tbl3'
/*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;