PolarDB PostgreSQL版(兼容Oracle)提供wal2json插件,可以将逻辑日志文件输出为JSON格式。
前提条件
支持的PolarDB PostgreSQL版(兼容Oracle)的版本如下:
Oracle语法兼容 2.0(内核小版本2.0.14.1.0及以上)
您可通过如下语句查看PolarDB PostgreSQL版(兼容Oracle)的内核小版本号:
SHOW polar_version;
背景信息
wal2json是逻辑解码插件,具体功能如下:
可以访问由
INSERT
和UPDATE
生成的元组。根据配置的副本身份,可以访问
UPDATE
和DELETE
旧行版本。可以使用流协议(逻辑复制插槽)或特殊的SQL API来使用更改。
wal2json插件会在每个事务中生成一个JSON对象。JSON对象中提供了所有新/旧元组,额外选项还可以包括事务时间戳、限定架构、数据类型、事务ID等属性。详情请参见通过SQL获取JSON对象。
注意事项
通过SQL获取JSON对象
wal2json插件不需要通过CREATE EXTENSION
来创建,而是通过逻辑复制槽来装载wal2json插件进行使用。
创建含有wal2json插件的逻辑复制槽后,通过如下命令获取WAL中的JSON对象。
-- 创建带主键和不带主键的表 CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); -- 创建wal2json类型的逻辑复制槽 SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); -- 提交事务,写入WAL BEGIN; INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table2_with_pk (b, c) VALUES('Replication', now()); DELETE FROM table2_with_pk WHERE a < 3; INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir'); UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; -- 获取WAL中的JSON对象 SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
返回结果如下:
{ "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] }
删除名为
test_slot
的复制槽,同时返回字符串'stop'
。SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
参数说明
wal2json相关参数说明如下:
参数 | 说明 |
change | 每次DML的WAL条目。例如,Insert、Update、Delete、Truncate的WAL记录。 |
changeset | 若干个change的集合。 |
include-xids | 用于控制是否添加xid到每一个changeset,默认值为false,取值如下:
|
include-timestamp | 用于控制是否添加timestamp到每一个changeset,默认值为false,取值如下:
|
include-schemas | 用于控制是否添加schema到每一个change,默认值为true,取值如下:
|
include-types | 用于控制是否添加type到每一个change,默认值为true,取值如下:
|
include-typmod | 将修饰符添加到具有修饰符的类型(例如,varchar(20)而不是varchar),默认值为true,取值如下:
|
include-type-oids | 用于控制是否添加类型oids,默认值为false,取值如下:
|
include-not-null | 用于控制是否添加
|
pretty-print | 用于控制是否向JSON结构添加空格和缩进,进行格式化,默认值为false,取值如下:
|
write-in-chunks | 用于控制是否是每次change后都写,而不是每个changeset,默认值为false,取值如下:
|
include-lsn | 用于控制是否添加nextlsn到每一个changeset,默认值为false,取值如下:
|
filter-tables | 排除指定表。默认为空,表示不过滤任何表。 说明
|
add-tables | 指定解析特定表,默认解析所有schema下的所有表。用法和filter-tables相同。 |
filter-msg-prefixes | 排除指定prefix的行,通常用于 |
add-msg-prefixes | 增加指定prefix的行,通常用于 |
format-version | 定义使用哪种输出格式,默认值为1。取值如下:
|
actions | 定义输出哪种操作。默认为所有(Insert,Update,Delete和Truncate)。如果您使用 |
示例
以include-xids
为例说明参数如何使用。
创建表和逻辑复制槽,并插入一行数据。
DROP TABLE IF EXISTS tbl; CREATE TABLE tbl (id int); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); INSERT INTO tbl VALUES (1);
将参数名和参数内容依次填入函数。
SELECT count(*) = 1, count(distinct ((data::json)->'xid')::text) = 1 FROM pg_logical_slot_get_changes( 'regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '1');
原理设计
更多信息和原理设计请参见官方使用文档。