wal2json(解码为JSON)

PolarDB PostgreSQL版(兼容Oracle)提供wal2json插件,可以将逻辑日志文件输出为JSON格式。

前提条件

支持的PolarDB PostgreSQL版(兼容Oracle)的版本如下:

Oracle语法兼容 2.0(内核小版本2.0.14.1.0及以上)

说明

您可通过如下语句查看PolarDB PostgreSQL版(兼容Oracle)的内核小版本号:

SHOW polar_version;

背景信息

wal2json是逻辑解码插件,具体功能如下:

  • 可以访问由INSERTUPDATE生成的元组。

  • 根据配置的副本身份,可以访问UPDATEDELETE旧行版本。

  • 可以使用流协议(逻辑复制插槽)或特殊的SQL API来使用更改。

wal2json插件会在每个事务中生成一个JSON对象。JSON对象中提供了所有新/旧元组,额外选项还可以包括事务时间戳、限定架构、数据类型、事务ID等属性。详情请参见通过SQL获取JSON对象

注意事项

  • 由于采用的复制方式为REPLICA_IDENTITY_FULL,因此在更新和删除时,所显示的数据为整行数据,而非默认的仅涉及更新和删除前后变化的列。如需修改为仅涉及更新前后变化的列,需要关闭 polar_create_table_with_full_replica_identity 参数,该参数无法通过控制台进行修改,请联系我们处理。

  • wal2json插件依赖于逻辑编解码功能,wal_level参数的值需设置为logical

    说明

    您可以通过控制台设置wal_level参数,详细操作请参考设置集群参数。修改该参数后集群将会重启,请在修改参数前做好业务安排,谨慎操作。

通过SQL获取JSON对象

wal2json插件不需要通过CREATE EXTENSION来创建,而是通过逻辑复制槽来装载wal2json插件进行使用。

  1. 创建含有wal2json插件的逻辑复制槽后,通过如下命令获取WAL中的JSON对象。

    -- 创建带主键和不带主键的表
    CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
    CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
    
    -- 创建wal2json类型的逻辑复制槽
    SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
    
    -- 提交事务,写入WAL
    BEGIN;
    INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
    INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
    DELETE FROM table2_with_pk WHERE a < 3;
    
    INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
    UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
    COMMIT;
    
    -- 获取WAL中的JSON对象
    SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');

    返回结果如下:

    {
        "change": [
            {
                "kind": "insert",
                "schema": "public",
                "table": "table2_with_pk",
                "columnnames": ["a", "b", "c"],
                "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
            }
            ,{
                "kind": "insert",
                "schema": "public",
                "table": "table2_with_pk",
                "columnnames": ["a", "b", "c"],
                "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
            }
            ,{
                "kind": "insert",
                "schema": "public",
                "table": "table2_with_pk",
                "columnnames": ["a", "b", "c"],
                "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
            }
            ,{
                "kind": "delete",
                "schema": "public",
                "table": "table2_with_pk",
                "oldkeys": {
                    "keynames": ["a", "c"],
                    "keytypes": ["integer", "timestamp without time zone"],
                    "keyvalues": [1, "2018-03-27 12:05:29.914496"]
                }
            }
            ,{
                "kind": "delete",
                "schema": "public",
                "table": "table2_with_pk",
                "oldkeys": {
                    "keynames": ["a", "c"],
                    "keytypes": ["integer", "timestamp without time zone"],
                    "keyvalues": [2, "2018-03-27 12:05:29.914496"]
                }
            }
            ,{
                "kind": "insert",
                "schema": "public",
                "table": "table2_without_pk",
                "columnnames": ["a", "b", "c"],
                "columntypes": ["integer", "numeric(5,2)", "text"],
                "columnvalues": [1, 2.34, "Tapir"]
            }
        ]
    }
  2. 删除名为test_slot的复制槽,同时返回字符串 'stop'

    SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

参数说明

wal2json相关参数说明如下:

参数

说明

change

每次DML的WAL条目。例如,Insert、Update、Delete、Truncate的WAL记录。

changeset

若干个change的集合。

include-xids

用于控制是否添加xid到每一个changeset,默认值为false,取值如下:

  • true:添加xid到每一个changeset。

  • false(默认):不添加xid到每一个changeset。

include-timestamp

用于控制是否添加timestamp到每一个changeset,默认值为false,取值如下:

  • true:添加timestamp到每一个changeset。

  • false(默认):不添加timestamp到每一个changeset。

include-schemas

用于控制是否添加schema到每一个change,默认值为true,取值如下:

  • true(默认):添加schema到每一个change。

  • false:不添加schema到每一个change。

include-types

用于控制是否添加type到每一个change,默认值为true,取值如下:

  • true(默认):添加type到每一个change。

  • false:不添加type到每一个change。

include-typmod

将修饰符添加到具有修饰符的类型(例如,varchar(20)而不是varchar),默认值为true,取值如下:

  • true(默认):将修饰符添加到具有修饰符的类型。

  • false:不将修饰符添加到具有修饰符的类型。

include-type-oids

用于控制是否添加类型oids,默认值为false,取值如下:

  • true:添加类型oids。

  • false(默认):不添加类型oids。

include-not-null

用于控制是否添加not null信息作为columnoptionals,默认值为false,取值如下:

  • true:添加not null信息作为columnoptionals。

  • false(默认):不添加not null信息作为columnoptionals。

pretty-print

用于控制是否向JSON结构添加空格和缩进,进行格式化,默认值为false,取值如下:

  • true:向JSON结构添加空格和缩进,进行格式化。

  • false(默认):不向JSON结构添加空格和缩进,不进行格式化。

write-in-chunks

用于控制是否是每次change后都写,而不是每个changeset,默认值为false,取值如下:

  • true:每次change后都写,而不是每个changeset。

  • false(默认):每个changeset后都写,而不是每次change。

include-lsn

用于控制是否添加nextlsn到每一个changeset,默认值为false,取值如下:

  • true:添加nextlsn到每一个changeset。

  • false(默认):不添加nextlsn到每一个changeset。

filter-tables

排除指定表。默认为空,表示不过滤任何表。

说明
  • 通过逗号分隔不同的表,每张表需要指定schema。

  • *.foo表示所有schema中的表foo,而bar.*表示schema中的所有表。

  • 特殊字符(空格,单引号,逗号,句号,星号)必须以\转义。

  • Schema和表区分大小写。

  • Schema public下的表Foo bar应指定为public.Foo\bar

add-tables

指定解析特定表,默认解析所有schema下的所有表。用法和filter-tables相同。

filter-msg-prefixes

排除指定prefix的行,通常用于pg_logical_slot_peek_changes()函数中。默认为空,保证没有信息被过滤,通过逗号分隔。

add-msg-prefixes

增加指定prefix的行,通常用于pg_logical_slot_peek_changes()函数中。默认为所有prefix,通过逗号分隔,需要在该参数之前使用filter-msg-prefixes

format-version

定义使用哪种输出格式,默认值为1。取值如下:

  • 1:使用1的输出格式。

  • 2:使用2的输出格式。

actions

定义输出哪种操作。默认为所有(Insert,Update,Delete和Truncate)。如果您使用format-version 1,truncate将不会被启用。

示例

include-xids为例说明参数如何使用。

  1. 创建表和逻辑复制槽,并插入一行数据。

    DROP TABLE IF EXISTS tbl;
    CREATE TABLE tbl (id int);
    SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
    INSERT INTO tbl VALUES (1);
  2. 将参数名和参数内容依次填入函数。

    SELECT    
    count(*) = 1,    
    count(distinct ((data::json)->'xid')::text) = 1
    FROM pg_logical_slot_get_changes(    
    'regression_slot', NULL, NULL,    
    'format-version', '1',    
    'include-xids', '1');

原理设计

更多信息和原理设计请参见官方使用文档