创建MySQL Catalog后,您可以在Flink控制台直接访问Catalog下MySQL实例中的表,并在Flink SQL作业中使用。本文为您介绍如何创建及使用MySQL Catalog。
背景信息
MySQL Catalog具有以下功能特点:
直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。
MySQL Catalog提供的表可以直接作为Flink SQL作业中的MySQL CDC源表、MySQL结果表和MySQL维表。
支持RDS MySQL、PolarDB MySQL或自建MySQL。
支持直接访问分库分表逻辑表。
支持配合CDAS和CTAS语法完成基于MySQL数据源的整库同步、分库分表合并同步、表结构变更同步。
使用限制
MySQL与Flink需在相同VPC下,跨VPC或公网访问时需要打通网络,详情请参见网络连通性。
创建后不支持修改Catalog配置信息。如需修改,请删除后重新创建。
仅支持查询已有数据库和表,不支持通过Flink创建数据库和表。
作为源表时仅支持流读、不支持批读。
说明MySQL Catalog提供的表作为MySQL CDC源表时,需要在RDS MySQL、PolarDB MySQL或者自建MySQL上开启Binlog等配置,详情请参见配置MySQL。
无法识别建表语句中使用PolarDB特有语法的表。
例如
PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`)。
实时计算引擎VVR 8.0.7及以上版本,创建后不支持使用视图作为Flink的表。
MySQL仅支持5.7和8.0.x版本。
创建MySQL Catalog
支持UI与SQL命令两种方式创建MySQL Catalog。
UI方式(推荐)
进入数据管理页面。
登录实时计算控制台,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击数据管理。
单击创建Catalog,选择MySQL,单击下一步。
填写参数配置信息。
重要Catalog创建完成后不支持修改以下配置信息。如需修改,请删除已创建的Catalog后重新创建。
参数
说明
是否必填
catalogname
自定义MySQL Catalog名称。
是
hostname
MySQL数据库的IP地址或者Hostname。
说明跨VPC或公网访问时需要打通网络,详情请参见网络连通性。
是
port
MySQL数据库服务的端口号,默认值为3306。
否
default-database
默认的MySQL数据库名称。
是
username
MySQL数据库服务的用户名。
是
password
MySQL数据库服务的密码。
为了避免AK明文等风险,建议通过变量方式填写(图片示例使用了名为mysqlpw的变量),详情请参见新增变量。
是
单击确定。
在左侧元数据区域,可以查看创建的Catalog。
SQL命令
进入数据查询页面。
登录实时计算控制台,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击
。
单击
,新建查询脚本,填写如下代码。
CREATE CATALOG <yourcatalogname> WITH( 'type' = 'mysql', 'hostname' = 'rm-bp1gcn0q0j0******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'usertest', 'password' = '${secret_values.mysqlpw}', 'default-database' = 'flinktest', 'catalog.table.metadata-columns'='table_name' );
参数
说明
是否必填
yourcatalogname
自定义MySQL Catalog名称。
重要无需保留尖括号(<>),否则语法检查会报错。
是
type
类型,固定值为mysql。
是
hostname
MySQL数据库的IP地址或者Hostname。
说明跨VPC或公网访问时需要打通网络,详情请参见网络连通性。
是
port
MySQL数据库服务的端口号,默认值为3306。
否
default-database
默认的MySQL数据库名称。
是
username
MySQL数据库服务的用户名。
是
password
MySQL数据库服务的密码。
为了避免AK明文等风险,建议通过变量方式填写(示例使用了名为mysqlpw的变量),详情请参见新增变量。
是
property-version
Catalog的参数版本。填写值为0(默认值)或1(推荐)。
不同参数版本可用的参数集合和参数的默认值可能不同,区别详情会在具体参数的说明部分描述。
说明仅VVR 8.0.6及以上版本支持配置该参数。
否
catalog.table.metadata-columns
指定获取数据表时,表的Schema需要添加MySQL CDC源表的元数据列。默认不添加元数据列。
多个元数据列使用英文分号(;)分隔,例如:
op_ts;table_name;database_name
。说明仅实时计算引擎VVR 6.0.5及以上版本支持该参数。
当配置该参数时,返回的表Schema会额外添加指定的元数据列,这些列只适用于MySQL CDC源表,所以该Catalog返回的表只能用作数据源表,不可以用作结果表或维表。
否
catalog.table.treat-tinyint1-as-boolean
获取数据表Schema时,是否将MySQL的TinyInt(1)和Boolean类型映射为Flink Boolean类型。参数取值如下:
true:映射为Boolean类型。
false:映射为TINYINT类型。
参数默认值:
property-version=0时,默认值为true;
property-version=1时,默认值为false。
说明仅实时计算引擎VVR 8.0.4及以上版本支持配置该参数。
不建议MySQL使用TinyInt(1)存储0和1以外的数值,请选择合适的数据类型做映射,参见类型映射。
否
选中创建Catalog的代码后,单击左侧代码行数上的运行。
返回
The following statement has been executed successfully!
表示创建成功。
查看和删除MySQL Catalog
UI方式(推荐)
在数据管理页面,单击Catalog列表下查看已创建的Catalog名称和类。
查看:单击目标Catalog名称对应操作列的查看,查看Catalog下的数据库和表信息。
表结构详情暂不展示字段comment信息。
删除:单击目标Catalog名称对应操作列的删除。
删除操作仅删除创建的Catalog,不会在相应服务中删除这些表。删除Catalog后使用该Catalog下表的已运行作业不受影响,但重新部署或启动时会报错(无法找到该表),请您谨慎操作。
SQL命令
在数据查询文本编辑区域,输入以下命令。
--查看Catalog下表在Flink中对应的Schema信息,暂不支持展示字段comment信息。 DESCRIBE `<catalogname>`.`<dbname>`.`<tablename>`; --删除Catalog DROP CATALOG `<catalogname>`;
说明删除操作仅删除创建的Catalog,不会在相应服务中删除这些表。删除Catalog后使用该Catalog下表的已运行作业不受影响,但重新部署或启动时会报错(无法找到该表),请您谨慎操作。
选中对应的命令,鼠标右键选择运行。
使用MySQL Catalog
从MySQL源表中读取数据
INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;
MySQL Catalog作为MySQL CDC源表时,建议使用Table Hints来为作业指定不同的 server-id。如果源表需要多并发读取,server-id还需要配置成范围格式,范围中的server-id个数需要大于等于并发度。
读取MySQL分库分表逻辑表
MySQL Catalog支持使用正则表达式,将库名和表名作为逻辑表名,来读取分库分表的数据。
例如,有一个分库分表的MySQL数据库,包括user01、user02和user99等多个表,分散在db01~db10等数据库中,且所有表的Schema都相互兼容,则可以通过如下正则表达式的库名表名访问到所有user的分库分表。
SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;
分库分表的逻辑表会返回额外的_db_name (STRING) 和_table_name (STRING)两个系统字段,且这两个字段与原分表的主键会作为逻辑表的新联合主键以保证主键的唯一性。如果user01~user99的主键均为id,则user逻辑表的联合主键为(_db_name, _table_name, id)。
MySQL Catalog支持结合正则表达式匹配所要同步的多张表,实现分库分表合并同步,具体示例请参见示例二:分库分表合并同步。
使用CTAS和CDAS实时同步MySQL数据变更和结构变更
同步时,请确认CTAS和CDAS支持的上下游存储列表,例如MongoDB不支持作为结果表。
CTAS支持单表同步、表结构变更同步、分库分表合并同步、自定义计算列同步,支持新增CTAS语句加入数据同步作业,具体示例及详情请参见CREATE TABLE AS(CTAS)语句。CDAS支持整库级别的表结构和数据的实时同步,以及表结构变更的同步,详情请参见CREATE DATABASE AS(CDAS)语句。
-- 单表同步,实时同步表级别的表结构变更和数据变更。
CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
WITH (...)
AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
/*+ OPTIONS('server-id'='6000-6018') */;
-- 整库同步,实时同步整库级别的表结构变更和数据变更。
CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='6000-6018') */;
例如,将MySQL数据同步到Hologres中,示例及详情请参见使用Hologres Catalog。
USE CATALOG holocatalog; --指定使用的catalog
CREATE TABLE IF NOT EXISTS holotable --指定同步后的表名,未填写数据库层级时自动同步到catalog下默认的数据库
WITH ('jdbcWriteBatchSize' = '1024') -- 可选,指定结果表的参数。
AS TABLE mysqlcatalog.dbmysql.mysqltable
/*+ OPTIONS('server-id'='8001-8004') */; -- 指定mysql-cdc源表的额外参数。
从MySQL维表中读取数据
INSERT INTO `<othersinktable>`
SELECT ...
FROM `<othersourcetable>` AS e
JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;
写入数据至MySQL表
INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
SELECT ...
FROM `<othersourcetable>`