PostgreSQL Catalog 用于管理 PostgreSQL 元数据。配置后,无需在 Flink 中手动重建表结构,即可直接读写 PostgreSQL 数据。
使用限制
仅VVR 11.4及以上版本支持创建PostgreSQL Catalog。
创建PostgreSQL Catalog
在数据查询文本编辑区域,运行以下 SQL 语句创建 Catalog。
CREATE CATALOG `postgres` WITH (
'type' = 'postgres',
'default-database' = 'postgres',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>'
);参数名称 | 是否必选 | 默认值 | 描述 |
type | 是 | 无 | Catalog 类型。固定为 |
hostname | 是 | 无 | PostgreSQL 数据库连接地址。 |
port | 否 | 5432 | 数据库端口号。 |
username | 是 | 无 | 访问数据库的用户名。 |
password | 是 | 无 | 访问数据库的密码。 |
default-database | 是 | 无 | 默认连接的数据库名称。 |
查看PostgreSQL Catalog
创建成功后,使用以下命令查看数据库和表。
USE CATALOG `postgres`;
SHOW DATABASES;
USE `postgres`;
SHOW TABLES;使用PostgreSQL Catalog
读取数据
您可以直接读取 PostgreSQL 表中的数据。若需配置 CDC 参数(如 Replication Slot),请使用 SQL Hint (OPTIONS) 覆盖配置。
SELECT *
FROM `postgres`.`postgres`.`public.target_table`
/*+ OPTIONS(
'slot.name' = 'testName',
'debezium.publication.autocreate.mode' = 'filtered'
) */;写入数据
INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name
FROM `source_table`;维表查询
INSERT INTO sink_table
SELECT
o.order_id,
o.user_id,
d.user_name,
o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;删除PostgreSQL Catalog
若不再使用,运行以下命令删除 Catalog。此操作仅删除 Flink 中的元数据映射。
DROP CATALOG `postgres`;该文章对您有帮助吗?