使用Replication Slot创建数据订阅

PostgreSQL原生支持使用Replication Slot开启数据订阅(CDC,Change Data Capture),本文介绍在RDS PostgreSQL实例上开启数据订阅的配置步骤。

前提条件

注意事项

  • 仅支持在主实例上开启数据订阅并消费,不支持只读实例。

  • 阿里云RDS PostgreSQL支持Logical Replication Slot Failover,主备切换操作不会影响数据订阅。更多介绍,请参见逻辑复制槽故障转移(Logical Replication Slot Failover)

  • 开启数据订阅前需要修改RDS PostgreSQL实例参数,该操作会触发实例重启,请在业务低峰期进行修改,避免对业务造成影响。

  • 开启了数据订阅的RDS PostgreSQL实例,需要注意:

    • 需要使用更多的WAL日志存储空间,如果消费数据订阅异常或停止,WAL日志不会自动清理,会导致日志堆积,占满整个磁盘空间,此时,实例将存在被锁定的风险,锁定后实例将变为只读状态,禁止写入数据。

    • 下游消费要实时上报位点,否则实例会一直保留旧版本的数据行。如果旧版本一直保留,会导致实例事务ID回卷,整个实例无法写入。错误日志示例:

      “HINT: Close open transactions soon to avoid wraparound problems. You might also need to commit or roll back old prepared transactions, or drop stale replication slots.”
      “WARNING: oldest xmin is far in the past.”

    您可以手动删除Replication Slot来让RDS PostgreSQL内核自动清理WAL日志、旧版本数据行。具体操作,请参见关闭数据订阅

    说明

    建议定期监控实例WAL日志大小和实例磁盘空间使用率,设置相关报警。更多信息,请参见查看增强监控管理报警

开启数据订阅

步骤一:创建测试数据库

  1. 访问RDS实例列表,在上方选择地域,然后单击目标实例ID。
  2. 在左侧导航栏中选择数据库管理

  3. 单击创建数据库创建测试数据库

    说明

    本示例以创建数据库testdb为例。创建数据库的具体操作步骤,请参见创建数据库

步骤二:创建测试账号并配置权限

  1. 在左侧导航栏中选择账号管理

  2. 单击创建账号,分别创建管理员高权限账号(db_admin)和数据订阅普通账号(cdc_user)。创建用户

    说明

    本示例创建的账号名仅为示例,您可以根据实际情况自定义名称。创建账号的具体操作步骤,请参见创建账号

  3. 使用db_admin账号连接RDS PostgreSQL实例。

    psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U db_admin -d testdb
    说明

    获取RDS PostgreSQL实例连接地址,请参见查看或修改连接地址和端口

  4. 执行如下命令,将cdc_user账号添加到Replication角色中。

    ALTER USER cdc_user WITH REPLICATION;

    您可以通过如下命令查询修改结果:

    SELECT rolreplication FROM  pg_roles WHERE rolname='cdc_user';

    查询结果示例:

     rolreplication
    ----------------
     t
    (1 row)
  5. 执行如下命令为cdc_user账号授权。

    GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC to cdc_user;

步骤三:调整RDS PostgreSQL实例参数

  1. 执行如下命令,查询实例参数设置。

    SELECT name,
           setting,
           short_desc,
           source
    FROM pg_settings
    WHERE name ='wal_level';

    查询结果示例:

             name          | setting |                               short_desc                                |       source
    -----------------------+---------+-------------------------------------------------------------------------+--------------------
     wal_level             | replica | Sets the level of information written to the WAL.                       | configuration file
    (1 rows)

    wal_level参数控制写入到WAL日志中的数据量,默认值是replica,只能在服务器启动时设置。取值范围:

    • minimal:仅记录从崩溃或立即关机状态中恢复实例所需的数据,不支持通过基础备份和WAL日志恢复数据库。

    • replica:写入足够的数据以支持WAL归档和复制,包括在standby上运行只读查询。

    • logical:增加逻辑解码所需的信息。

  2. 访问RDS实例列表,在上方选择地域,然后单击目标实例ID。
  3. 在左侧导航栏中选择参数设置

  4. wal_level参数取值修改为logical

    说明
    • 修改实例参数的具体方法,请参见设置实例参数

    • 修改实例参数并提交后,RDS PostgreSQL实例将重启,请在业务低峰期修改实例参数,避免对业务造成影响。

步骤四:创建Logical Replication Slot

说明

步骤三修改了实例参数,请在RDS PostgreSQL实例重启完成后,实例状态为运行中时再进行本步骤操作。

  1. 使用db_admin账号连接RDS PostgreSQL实例。

    psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U db_admin -d testdb
  2. 执行如下命令,使用输出插件test_decoding创建一个名为cdc_replication_slot的Replication Slot。

    SELECT pg_create_logical_replication_slot('cdc_replication_slot', 'test_decoding');
    说明
    • cdc_replication_slot仅为示例,您可以根据实际情况自定义名称。

    • test_decoding为PostgreSQL原生提供的输出插件,无需修改。

    执行结果示例:

     pg_create_logical_replication_slot
    ------------------------------------
     (cdc_replication_slot,1/14003428)
    (1 row)

    您可以通过如下命令查询创建结果:

    SELECT * FROM pg_replication_slots;

    查询结果示例:

          slot_name       |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size  | two_phase
    ----------------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+----------------+-----------
     cdc_replication_slot | test_decoding | logical   |  18822 | testdb   | f         | f      |            |      |        22356 | 1/140033F0  | 1/14003428          | reserved   |                | f
    (1 row)

步骤五:创建测试数据

执行如下命令,创建测试数据,模拟生产环境。

CREATE TABLE public.tb_test(
    id int NOT NULL PRIMARY KEY
);

ALTER TABLE public.tb_test ADD name varchar(1) NULL;

INSERT INTO public.tb_test SELECT 1, 'A';

步骤六:客户端读取数据

  1. 使用cdc_user账号连接RDS PostgreSQL实例。

    psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U cdc_user -d testdb
  2. 执行如下命令,从Replication Slot中读取数据。

    SELECT * FROM pg_logical_slot_peek_changes('cdc_replication_slot', null, null);

    查询结果示例:

        lsn     |  xid  |                                  data
    ------------+-------+-------------------------------------------------------------------------
     1/14003D90 | 22376 | BEGIN 22376
     1/1401DDE8 | 22376 | COMMIT 22376
     1/1401DDE8 | 22377 | BEGIN 22377
     1/1401E100 | 22377 | COMMIT 22377
     1/1401E2A8 | 22382 | BEGIN 22382
     1/1401E2A8 | 22382 | table public.tb_test: INSERT: id[integer]:1 name[character varying]:'A'
     1/1401E3C0 | 22382 | COMMIT 22382
    (7 rows)

步骤七:客户端消费订阅数据

  1. 执行\q命令退出数据库连接。

  2. 执行如下命令,消费订阅数据。

    说明

    pg_recvlogical命令需要在postgres用户下执行,您可以使用su - postgres命令切换用户,如果提示-bash: pg_recvlogical: command not found,解决方法,请参见常见问题

    pg_recvlogical -h <RDS PostgreSQL实例连接地址> -U <高权限账号> -d <测试数据库> --create-slot --if-not-exists --slot=cdc_replication_slot --plugin=test_decoding --start -f -

    命令示例:

    pg_recvlogical -h pgm-*****.pgsql.singapore.rds.aliyuncs.com -U db_admin -d testdb --create-slot --if-not-exists --slot=cdc_replication_slot --plugin=test_decoding --start -f -

    结果示例:

    BEGIN 22376
    COMMIT 22376
    BEGIN 22377
    COMMIT 22377
    BEGIN 22382
    table public.tb_test: INSERT: id[integer]:1 name[character varying]:'A'
    COMMIT 22382

关闭数据订阅

开启了数据订阅的RDS PostgreSQL实例,需要使用更多的WAL日志存储空间,如果消费数据订阅异常或停止,WAL日志不会自动清理,会导致日志堆积,占满整个磁盘空间,此时,实例将存在被锁定的风险,锁定后实例将变为只读状态,禁止写入数据。您可以手动删除非活跃的Replication Slot来让RDS PostgreSQL内核自动清理WAL日志。

RDS PostgreSQL支持通过控制台、API或SQL命令的方式删除非活跃的Replication Slot,具体方法如下:

  • 控制台删除:WAL日志管理

  • API:DeleteSlot

  • SQL命令:

    1. 使用db_admin账号连接RDS PostgreSQL实例。

      psql -h <RDS PostgreSQL实例连接地址> -p 5432 -U db_admin -d testdb
    2. 执行如下命令,查看Inactive slot的名称及相关信息。

      SELECT slot_name, slot_type, database, active, safe_wal_size
      FROM pg_replication_slots
      WHERE active = 'f';

      查询示例如下:

            slot_name       | slot_type | database | active | safe_wal_size
      ----------------------+-----------+----------+--------+---------------
       cdc_replication_slot | logical   | testdb   | f      |
      (1 row)
    3. 执行如下命令,删除Logical Replication Slot。

      SELECT pg_drop_replication_slot('cdc_replication_slot');

常见问题

  • Q:客户端消费数据时提示-bash: pg_recvlogical: command not found,如何处理?

    A:pg_recvlogical是PostgreSQL的原生逻辑解码工具,该工具使用默认的test_decoding插件,此插件位于PostgreSQL源码包的contrib/test_decoding目录下,建议对PostgreSQL源码进行编译安装,然后在客户端安装目录的/bin目录下,即可查看到pg_recvlogical工具。如何从源码安装PostgreSQL,请参见Installation from Source Code

  • Q:已经手动删除了Replication Slot,但是WAL日志并未自动清除,仍占用磁盘空间,如何处理?

    A:您可以调整参数wal_keep_segments取值为默认值128,减少文件保存个数。修改参数的具体方法,请参见设置实例参数