管理PostgreSQL Catalog

PostgreSQL Catalog 用于管理 PostgreSQL 元数据。配置后,无需在 Flink 中手动重建表结构,即可直接读写 PostgreSQL 数据。

使用限制

VVR 11.4及以上版本支持创建PostgreSQL Catalog。

创建PostgreSQL Catalog

数据查询文本编辑区域,运行以下 SQL 语句创建 Catalog。

CREATE CATALOG `postgres` WITH (
    'type' = 'postgres',
    'default-database' = 'postgres',                      
    'hostname' = '<yourHostname>',
    'port' = '5432',                                       
    'username' = '<yourUserName>',                        
    'password' = '<yourPassWord>'                           
);

参数名称

是否必选

默认值

描述

type

Catalog 类型。固定为postgres

hostname

PostgreSQL 数据库连接地址。

port

5432

数据库端口号。

username

访问数据库的用户名。

password

访问数据库的密码。

default-database

默认连接的数据库名称。

查看PostgreSQL Catalog

创建成功后,使用以下命令查看数据库和表。

USE CATALOG `postgres`;
SHOW DATABASES;

USE `postgres`;
SHOW TABLES;

使用PostgreSQL Catalog

读取数据

您可以直接读取 PostgreSQL 表中的数据。若需配置 CDC 参数(如 Replication Slot),请使用 SQL Hint (OPTIONS) 覆盖配置。

SELECT * 
FROM `postgres`.`postgres`.`public.target_table` 
/*+ OPTIONS(
    'slot.name' = 'testName', 
    'debezium.publication.autocreate.mode' = 'filtered'
) */;

写入数据

INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name 
FROM `source_table`;

维表查询

INSERT INTO sink_table
SELECT 
  o.order_id,
  o.user_id,
  d.user_name,  
  o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;

删除PostgreSQL Catalog

若不再使用,运行以下命令删除 Catalog。此操作仅删除 Flink 中的元数据映射。

DROP CATALOG `postgres`;