PolarDB PostgreSQL版(兼容Oracle)支持使用逻辑复制来复制数据定义命令 (DDL)。
背景信息
原生PostgreSQL仅支持表数据的同步,需要用户手动在发布端和订阅端建立相同定义的表,才能确保表数据正确的同步。
PolarDB PostgreSQL版(兼容Oracle)扩展了逻辑复制的能力,支持数据定义语言(DDL)逻辑复制,即通过发布订阅的方式可以复制数据库对象的CREATE/ALTER/DROP行为到订阅端。
前提条件
DDL逻辑复制功能需要将wal_level参数设置为logical,修改参数的具体操作请参见设置集群参数。
语法
- CREATE PUBLICATION- CREATE PUBLICATION name [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ] publication_parameter: ... pubddl = '(none | table | all)'- 其中, - publication_parameter增加了- pubddl参数,取值为- none、- table、- all。- none(默认):表示不复制DDL。 
- table:表示仅复制 - table相关的DDL语句。- CREATE TABLE
- ALTER TABLE
- DROP TABLE
- CREATE TABLE AS
 
- all:表示复制所有的DDL语句,目前支持的DDL语句如下: - ALTER INDEX
- ALTER SEQUENCE
- ALTER TABLE
- ALTER TYPE
- CREATE INDEX
- CREATE SCHEMA
- CREATE SEQUENCE
- CREATE TABLE
- CREATE TABLE AS
- CREATE TYPE
- CREATE TYPE HEADER
- CREATE TYPE BODY
- DROP INDEX
- DROP SCHEMA
- DROP SEQUENCE
- DROP TABLE
- DROP TYPE说明- 如果指定 - pubddl = 'all',则必须指定- FOR ALL TABLES。 全局命令可以在任何数据库执行,目前不支持复制,全局命令包括- ROLE、- DATABASE、- TableSpace语句和一些- GrantStmt/- RevokeStmt(如果目标对象是全局对象)。
 
 
- CREATE SUBSCRIPTION- CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ] subscription_parameter: ... dump_schema = false/true- 其中, - subscription_parameter新增- dump_schema参数,支持在创建订阅时将发布端的存量对象定义- dump到订阅端,默认值为false,取值如下:- false(默认):表示不支持在创建订阅时将发布端的存量对象定义 - dump到订阅端。
- true:表示支持在创建订阅时将发布端的存量对象定义 - dump到订阅端。
 
dump_schema使用了pg_dump/pg_restore工具,需要确保集群访问限制支持host='127.0.0.1'连接,否则会恢复失败。 dump的文件存放在集群的本地目录pg_logical/schemadumps中,恢复或出错后会删除。
参数说明
| 参数 | 说明 | 
| polar_enable_ddl_replication | 开启或关闭DDL逻辑复制功能。取值如下: 
 | 
| polar_enable_debug_ddl_replication | 开启或关闭debug ddl replicaiton,打印更多日志。取值如下: true:开启debug ddl replicaiton。 false(默认):关闭debug ddl replicaiton。 | 
示例
- 创建一个支持DDL的发布。 - CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');- 显示结果如下: - CREATE PUBLICATION
- 创建一个订阅。 - CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;- 显示结果如下: - NOTICE: created replication slot "mysub" on publisher CREATE SUBSCRIPTION
- 在发布端执行SQL。 - # 创建表 CREATE TABLE t1(id int ,val char(3)); # 插入数据 INSERT INTO t1 values (1,'a'); INSERT INTO t1 values (2,'b'); INSERT INTO t1 values (3,'c'); # 修改表 ALTER TABLE t1 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY, ALTER COLUMN c SET GENERATED ALWAYS; # 查看表内容 SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # 查看注释 \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Publications: "mypub" Replica Identity: FULL Access method: heap
- 在订阅端查看复制情况。 - # 查看表内容 SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # 查看注释 \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Replica Identity: FULL Access method: heap
- 在发布端删除表。 - DROP TABLE t1;
- 在订阅端查看复制情况。 - SELECT * FROM t1;- 显示结果如下: - ERROR: relation "t1" does not exist LINE 1: SELECT * FROM t1;
解码插件
解码插件新增以下两个回调接口。
/*
 * Output plugin callbacks
 */
typedef struct OutputPluginCallbacks
{
  ...
	LogicalDecodeDDLMessageCB ddl_cb;
	/* streaming of changes */
	...
	LogicalDecodeStreamDDLMessageCB stream_ddl_cb;
} OutputPluginCallbacks;
/*
 * Called for the logical decoding DDL messages.
 */
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
										   ReorderBufferTXN *txn,
										   XLogRecPtr message_lsn,
										   const char *prefix,
										   Oid relid,
										   DeparsedCommandType cmdtype,
										   Size message_size,
										   const char *message);
/*
 * Callback for streaming logical decoding DDL messages from in-progress
 * transactions.
 */
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
												 ReorderBufferTXN *txn,
												 XLogRecPtr message_lsn,
												 const char *prefix,
												 Oid relid,
												 DeparsedCommandType cmdtype,
												 Size message_size,
												 const char *message);test_decoding插件中已经实现ddl message的方法,使用test decoding插件的方法如下:
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
create table t3(id int);
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
    lsn     | xid |                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                     
                                                                        data                                                                                                                                                                         
                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                     
                                                                                                                                  
------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------
 0/C001BF10 | 783 | BEGIN 783
 0/C001EBC0 | 783 | message: prefix: deparse, relid: 16418, cmdtype: Simple, sz: 1505 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s
 %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": fal
se, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "
present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{co
llation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": 
false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_
type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}
 0/C001EE98 | 783 | COMMIT 783
select polar_catalog.ddl_deparse_expand_command('{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}');
                       ddl_deparse_expand_command                        
-------------------------------------------------------------------------
 CREATE  TABLE  public.t3 (id pg_catalog.int4 STORAGE plain      )      
(1 row)
新增系统表与系统函数
- polar_catalog.ddl_deparse_to_json - 定义: - ddl_deparse_to_json(IN pg_ddl_command) RETURN text
- 说明:将内部的parsetree解析成JSON字符串。 
- 参数:输入pg_ddl_command类型parsetree, 返回text类型的JSON串。 
 
- polar_catalog.ddl_deparse_expand_command - 定义: - ddl_deparse_expand_command(IN text) RETURN text
- 说明:将JSON string解析成SQL string。 
- 参数:输入text类型JSON串, 返回text类型的SQL字符串。 
 
- polar_catalog.polar_publication - 定义如下: - TABLE polar_publication ( puboid Oid primary key, -- publication oid pubddl "char", -- publication是否支持ddl object pubglobal "char", -- publication是否支持global object(未来支持) pubflags int -- 预留标记位 );