PolarDB PostgreSQL版支持使用逻辑复制来复制数据定义命令 (DDL)。
背景信息
原生PostgreSQL仅支持表数据的同步,需要用户手动在发布端和订阅端建立相同定义的表,才能确保表数据正确的同步。
PolarDB PostgreSQL版扩展了逻辑复制的能力,支持数据定义语言(DDL)逻辑复制,即通过发布订阅的方式可以复制数据库对象的CREATE/ALTER/DROP
行为到订阅端。
前提条件
DDL逻辑复制功能需要将wal_level
参数设置为logical
,修改参数的具体操作请参见设置集群参数。
语法
CREATE PUBLICATION
CREATE PUBLICATION name [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ] publication_parameter: ... pubddl = '(none | table | all)'
其中,
publication_parameter
增加了pubddl
参数,取值为none
、table
、all
。none(默认):表示不复制DDL。
table:表示仅复制
table
相关的DDL语句。CREATE TABLE
ALTER TABLE
DROP TABLE
CREATE TABLE AS
all:表示复制所有的DDL语句,目前支持的DDL语句如下:
ALTER INDEX
ALTER SEQUENCE
ALTER TABLE
ALTER TYPE
CREATE INDEX
CREATE SCHEMA
CREATE SEQUENCE
CREATE TABLE
CREATE TABLE AS
CREATE TYPE
CREATE TYPE HEADER
CREATE TYPE BODY
DROP INDEX
DROP SCHEMA
DROP SEQUENCE
DROP TABLE
DROP TYPE
说明如果指定
pubddl = 'all'
,则必须指定FOR ALL TABLES
。 全局命令可以在任何数据库执行,目前不支持复制,全局命令包括ROLE
、DATABASE
、TableSpace
语句和一些GrantStmt
/RevokeStmt
(如果目标对象是全局对象)。
CREATE SUBSCRIPTION
CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ] subscription_parameter: ... dump_schema = false/true
其中,
subscription_parameter
新增dump_schema
参数,支持在创建订阅时将发布端的存量对象定义dump
到订阅端,默认值为false,取值如下:false(默认):表示不支持在创建订阅时将发布端的存量对象定义
dump
到订阅端。true:表示支持在创建订阅时将发布端的存量对象定义
dump
到订阅端。
dump_schema
使用了pg_dump/pg_restore工具,需要确保集群访问限制支持host='127.0.0.1'
连接,否则会恢复失败。 dump的文件存放在集群的本地目录pg_logical/schemadumps
中,恢复或出错后会删除。
参数说明
参数 | 说明 |
polar_enable_ddl_replication | 开启或关闭DDL逻辑复制功能。取值如下:
|
polar_enable_debug_ddl_replication | 开启或关闭debug ddl replicaiton,打印更多日志。取值如下: true:开启debug ddl replicaiton。 false(默认):关闭debug ddl replicaiton。 |
示例
创建一个支持DDL的发布。
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
显示结果如下:
CREATE PUBLICATION
创建一个订阅。
CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;
显示结果如下:
NOTICE: created replication slot "mysub" on publisher CREATE SUBSCRIPTION
在发布端执行SQL。
# 创建表 CREATE TABLE t1(id int ,val char(3)); # 插入数据 INSERT INTO t1 values (1,'a'); INSERT INTO t1 values (2,'b'); INSERT INTO t1 values (3,'c'); # 修改表 ALTER TABLE t1 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY, ALTER COLUMN c SET GENERATED ALWAYS; # 查看表内容 SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # 查看注释 \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Publications: "mypub" Replica Identity: FULL Access method: heap
在订阅端查看复制情况。
# 查看表内容 SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # 查看注释 \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Replica Identity: FULL Access method: heap
在发布端删除表。
DROP TABLE t1;
在订阅端查看复制情况。
SELECT * FROM t1;
显示结果如下:
ERROR: relation "t1" does not exist LINE 1: SELECT * FROM t1;
解码插件
解码插件新增以下两个回调接口。
/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks
{
...
LogicalDecodeDDLMessageCB ddl_cb;
/* streaming of changes */
...
LogicalDecodeStreamDDLMessageCB stream_ddl_cb;
} OutputPluginCallbacks;
/*
* Called for the logical decoding DDL messages.
*/
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);
/*
* Callback for streaming logical decoding DDL messages from in-progress
* transactions.
*/
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);
test_decoding
插件中已经实现ddl message的方法,使用test decoding
插件的方法如下:
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
create table t3(id int);
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid |
data
------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------
0/C001BF10 | 783 | BEGIN 783
0/C001EBC0 | 783 | message: prefix: deparse, relid: 16418, cmdtype: Simple, sz: 1505 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s
%{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": fal
se, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "
present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{co
llation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present":
false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_
type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}
0/C001EE98 | 783 | COMMIT 783
select polar_catalog.ddl_deparse_expand_command('{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}');
ddl_deparse_expand_command
-------------------------------------------------------------------------
CREATE TABLE public.t3 (id pg_catalog.int4 STORAGE plain )
(1 row)
新增系统表与系统函数
polar_catalog.ddl_deparse_to_json
定义:
ddl_deparse_to_json(IN pg_ddl_command) RETURN text
说明:将内部的parsetree解析成JSON字符串。
参数:输入pg_ddl_command类型parsetree, 返回text类型的JSON串。
polar_catalog.ddl_deparse_expand_command
定义:
ddl_deparse_expand_command(IN text) RETURN text
说明:将JSON string解析成SQL string。
参数:输入text类型JSON串, 返回text类型的SQL字符串。
polar_catalog.polar_publication
定义如下:
TABLE polar_publication ( puboid Oid primary key, -- publication oid pubddl "char", -- publication是否支持ddl object pubglobal "char", -- publication是否支持global object(未来支持) pubflags int -- 预留标记位 );