管理MySQL Catalog

更新时间:
复制 MD 格式

创建MySQL Catalog后,您可以在Flink控制台直接访问CatalogMySQL实例中的表,并在Flink SQL作业中使用。本文为您介绍如何创建及使用MySQL Catalog。

背景信息

MySQL Catalog具有以下功能特点:

  • 直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。

  • MySQL Catalog提供的表可以直接作为Flink SQL作业中的MySQL CDC源表、MySQL结果表和MySQL维表。

  • 支持RDS MySQL、PolarDB MySQL或自建MySQL。

  • 支持直接访问分库分表逻辑表。

  • 支持配合CDASCTAS语法完成基于MySQL数据源的整库同步、分库分表合并同步、表结构变更同步。

使用限制

  • MySQLFlink需在相同VPC下,跨VPC或公网访问时需要打通网络,详情请参见网络连通性

  • 创建后不支持修改Catalog配置信息。如需修改,请删除后重新创建。

  • 仅支持查询已有数据库和表,不支持通过Flink创建数据库和表。

  • 作为源表时仅支持流读、不支持批读。

    说明

    MySQL Catalog提供的表作为MySQL CDC源表时,需要在RDS MySQL、PolarDB MySQL或者自建MySQL上开启Binlog等配置,详情请参见配置MySQL

  • 无法识别建表语句中使用PolarDB特有语法的表。

    例如PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`)。

  • 实时计算引擎VVR 8.0.7及以上版本,创建后不支持使用视图作为Flink的表。

  • MySQL仅支持5.78.0.x版本。

创建MySQL Catalog

支持UISQL命令两种方式创建MySQL Catalog。

UI方式(推荐)

  1. 进入数据管理页面。

    1. 登录实时计算控制台,单击目标工作空间操作列下的控制台

    2. 在左侧导航栏,单击数据管理

  2. 单击创建Catalog,选择MySQL,单击下一步

  3. 填写参数配置信息。

    重要

    Catalog创建完成后不支持修改以下配置信息。如需修改,请删除已创建的Catalog后重新创建。

    参数

    说明

    是否必填

    catalogname

    自定义MySQL Catalog名称。

    hostname

    MySQL数据库的IP地址或者Hostname。

    说明

    VPC公网访问时需要打通网络,详情请参见网络连通性

    port

    MySQL数据库服务的端口号,默认值为3306。

    default-database

    默认的MySQL数据库名称。

    username

    MySQL数据库服务的用户名。

    password

    MySQL数据库服务的密码。

    为了避免AK明文等风险,建议通过变量方式填写(图片示例使用了名为mysqlpw的变量),详情请参见新增变量

  4. 单击确定

    在左侧元数据区域,可以查看创建的Catalog。

SQL命令

  1. 进入数据查询页面。

    1. 登录实时计算控制台,单击目标工作空间操作列下的控制台

    2. 在左侧导航栏,单击数据开发 > 数据查询

  2. 单击image,单击新建查询脚本,填写文件名称存储位置后,单击保存

  3. 填写如下代码。

    CREATE CATALOG YourCatalogName WITH(
      'type' = 'mysql',
      'hostname' = 'rm-bp1gcn0q0j0******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'usertest',
      'password' = '${secret_values.mysqlpw}',
      'default-database' = 'flinktest',
      'catalog.table.metadata-columns'='table_name'
    );

    参数

    说明

    是否必填

    YourCatalogName

    自定义MySQL Catalog名称。

    type

    类型,固定值为mysql。

    hostname

    MySQL数据库的IP地址或者Hostname。

    说明

    VPC公网访问时需要打通网络,详情请参见网络连通性

    port

    MySQL数据库服务的端口号,默认值为3306。

    default-database

    默认的MySQL数据库名称。

    username

    MySQL数据库服务的用户名。

    password

    MySQL数据库服务的密码。

    为了避免AK明文等风险,建议通过变量方式填写(示例使用了名为mysqlpw的变量),详情请参见新增变量

    property-version

    Catalog的参数版本。填写值为0(默认值)或1(推荐)。

    不同参数版本可用的参数集合和参数的默认值可能不同,区别详情会在具体参数的说明部分描述。

    说明
    • VVR 8.0.6及以上版本支持配置该参数。

    • VVR 11.1及以上版本默认值为1。

    catalog.table.metadata-columns

    指定获取数据表时,表的Schema需要添加MySQL CDC源表的元数据列。默认不添加元数据列。

    多个元数据列使用英文分号(;)分隔,例如:op_ts;table_name;database_name

    说明
    • 仅实时计算引擎VVR 6.0.5及以上版本支持该参数。

    • 当配置该参数时,返回的表Schema会额外添加指定的元数据列,这些列只适用于MySQL CDC源表,所以该Catalog返回的表只能用作数据源表,不可以用作结果表或维表。

    catalog.table.treat-tinyint1-as-boolean

    获取数据表Schema时,是否将MySQLTinyInt(1)和Boolean类型映射为Flink Boolean类型。参数取值如下:

    • true:映射为Boolean类型。

    • false:映射为TINYINT类型。

    参数默认值:

    • property-version=0时,默认值为true;

    • property-version=1时,默认值为false。

    说明
    • 仅实时计算引擎VVR 8.0.4及以上版本支持配置该参数。

    • 不建议MySQL使用TinyInt(1)存储01以外的数值,请选择合适的数据类型做映射,参见类型映射

  4. 选中创建Catalog的代码后,单击左侧代码行数上的运行

    返回The following statement has been executed successfully!表示创建成功。

    编辑器中的SQL代码为CREATE CATALOG myCatalog语句,配置参数包括type=mysqlhostname=rm-bp1gcn0q0j0**.mysql.rds.aliyuncs.comport=3306username=usertestpassword=${secret_values.mysqlpw}(通过密钥引用)、default-database=flinktest以及catalog.table.metadata-columns=table_name

查看和删除MySQL Catalog

UI方式(推荐)

数据管理页面,单击Catalog列表下查看已创建的Catalog名称

  • 查看:单击目标Catalog名称对应操作列的查看,查看Catalog下的数据库和表信息。

    表结构详情暂不展示字段comment信息。

  • 删除:单击目标Catalog名称对应操作列的删除

    删除操作仅删除创建的Catalog,不会在相应服务中删除这些表。删除Catalog后使用该Catalog下表的已运行作业不受影响,但重新部署或启动时会报错(无法找到该表),请您谨慎操作。

SQL命令

  1. 数据查询文本编辑区域,输入以下命令。

    --查看Catalog下表在Flink中对应的Schema信息,暂不支持展示字段comment信息。
    DESCRIBE `<catalogname>`.`<dbname>`.`<tablename>`;
    --删除Catalog
    DROP CATALOG `<catalogname>`;
    说明

    删除操作仅删除创建的Catalog,不会在相应服务中删除这些表。删除Catalog后使用该Catalog下表的已运行作业不受影响,但重新部署或启动时会报错(无法找到该表),请您谨慎操作。

  2. 选中对应的命令,鼠标右键选择运行

    执行 DESCRIBE 语句后,返回 orders_1 表的结构信息,包含字段 orderkeycustkeyorder_statustotal_price 及其对应的数据类型和属性。

使用MySQL Catalog

MySQL源表中读取数据

INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;

MySQL Catalog作为MySQL CDC源表时,建议使用Table Hints来为作业指定不同的 server-id。如果源表需要多并发读取,server-id还需要配置成范围格式,范围中的server-id个数需要大于等于并发度。

读取MySQL分库分表逻辑表

MySQL Catalog支持使用正则表达式,将库名和表名作为逻辑表名,来读取分库分表的数据。

例如,有一个分库分表的MySQL数据库,包括user01、user02user99等多个表,分散在db01~db10等数据库中,且所有表的Schema都相互兼容,则可以通过如下正则表达式的库名表名访问到所有user的分库分表。

SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

分库分表的逻辑表会返回额外的_db_name (STRING) 和_table_name (STRING)两个系统字段,且这两个字段与原分表的主键会作为逻辑表的新联合主键以保证主键的唯一性。如果user01~user99的主键均为id,则user逻辑表的联合主键为(_db_name, _table_name, id)。

MySQL Catalog支持结合正则表达式匹配所要同步的多张表,实现分库分表合并同步,具体示例请参见分库分表合并同步

使用CTASCDAS实时同步MySQL数据变更和结构变更

CTAS支持单表同步、表结构变更同步、分库分表合并同步、自定义计算列同步,支持新增CTAS语句加入数据同步作业,具体示例及详情请参见CREATE TABLE AS(CTAS)语句。CDAS支持整库级别的表结构和数据的实时同步,以及表结构变更的同步,详情请参见CREATE DATABASE AS(CDAS)语句

-- 单表同步,实时同步表级别的表结构变更和数据变更。
CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
WITH (...)
AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
/*+ OPTIONS('server-id'='6000-6018') */;
-- 整库同步,实时同步整库级别的表结构变更和数据变更。
CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='6000-6018') */;   

例如,将MySQL数据同步到Hologres中,示例及详情请参见使用Hologres Catalog

USE CATALOG holocatalog; --指定使用的catalog
CREATE TABLE IF NOT EXISTS holotable  --指定同步后的表名,未填写数据库层级时自动同步到catalog下默认的数据库  
WITH ('jdbcWriteBatchSize' = '1024')   -- 可选,指定结果表的参数。
AS TABLE mysqlcatalog.dbmysql.mysqltable   
/*+ OPTIONS('server-id'='8001-8004') */;  -- 指定mysql-cdc源表的额外参数。

MySQL维表中读取数据

INSERT INTO `<othersinktable>`
SELECT ...
FROM `<othersourcetable>` AS e
JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;

写入数据至MySQL

INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
SELECT ...
FROM `<othersourcetable>`