本文为您介绍如何通过JDBC和Holo-Client这两种方式消费Hologres Binlog。

前提条件

  • 需提前开启和配置Hologres Binlog,详情请参见订阅Hologres Binlog
  • 需创建hg_binlog extension。
    • Hologres V2.0版本前,需要实例的Superuser执行以下语句创建extension才可以使用该功能,extension针对整个DB生效,一个DB只需执行一次,新建DB需要再次执行。
      --创建
      create extension hg_binlog;
      --删除
      DROP extension hg_binlog;
    • Hologres从 V2.0版本起,无需手动创建extension即可使用。
  • 需授予用户如下权限其中之一:
    • 实例的Superuser权限
    • 目标表的Owner权限及实例的Replication Role权限。

使用限制

  • 仅Hologres V1.1及以上版本支持通过JDBC消费Hologres Binlog,如果您的实例是V1.1以下版本,请您使用自助升级或加入实时数仓Hologres交流群(钉钉群号:32314975)申请升级实例。
  • 仅以下数据类型支持消费Hologres Binlog:INTEGER、BIGINT、SMALLINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、SERIAL、OID、int4[]、int8[]、float4[]、float8[]、boolean[]、text[],从HologresV1.3.36版本开始支持JSONB类型。如果表中有以上类型之外的数据类型,会造成消费失败。
    说明 Hologres从V1.3.36版本开始支持JSONB数据类型消费Hologres Binlog,消费之前需要开启如下GUC参数:
    -- Session级别开启GUC
    set hg_experimental_enable_binlog_jsonb = on;
    
    -- DB级别开启GUC
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • 同普通连接类似,在使用JDBC进行Binlog的消费时,所消费的每张表的每个Shard都会使用1个Walsender连接,Walsender连接与普通连接独立,互不影响。
  • Walsenders数也有使用上限,可以通过以下命令查看单个Frontend节点的最大Walsender数(V2.0版本起默认值调整为1000,V1.1.26至V2.0版本之间默认为100),总的数目需要乘实例的Frontend节点数,不同规格实例的Frontend节点数请参见实例规格概述
    show max_wal_senders;
    说明 Hologres实例支持的同时消费Binlog的表数量可以通过以下方式计算:表数量<=(max_wal_senders(100或1000) * FrontEnd节点数)/表的Shard Count
    例如:
    • 表a和表b的Shard Count都为20,表c的Shard Count为30,则三张表同时进行消费Binlog占用的Walsenders数量为20 + 20 + 30 = 70
    • 表a和表b的Shard Count都为20,表a同时有两个作业在进行Binlog消费,同时进行消费占用的Walsenders数量为20 *2 + 20 = 60
    • 一个实例有两个Frontend节点,则其最大Walsenders数为100*2 = 200,最多支持同时消费10张Shard Count为20的表同时进行消费Binlog。
    如果使用JDBC进行Binlog消费的连接数达到上限,会提示FATAL: sorry, too many wal senders already的错误信息,可以按照如下思路进行排查处理:
    1. 检查使用JDBC进行Binlog消费的作业,减少其中非必要的Binlog消费。
    2. 检查Table Group与Shard数设计是否合理,详情请参见Table Group设置最佳实践
    3. 如连接数仍超出限制,则须考虑扩容实例。
  • 只读从实例暂不支持使用JDBC消费Binlog。

Publication和Replication Slot

  • Publication
    • 简介:

      本质上是一组表,这些表的数据更改旨在通过逻辑复制进行表中数据复制,详细内容请参见Publication。当前Hologres支持的Publication只支持绑定一张物理表,且该表需要开启Binlog功能。

    • 创建Publication
      • 语法示例
        CREATE PUBLICATION name FOR TABLE table_name;
      • 参数说明
        参数说明
        name自定义Publication名称。
        table_name数据库中表名称。
      • 使用示例
        --示例创建一个名为hg_publication_test_1的Publication,且将表test_message_src添加至该Publication下
        create publication hg_publication_test_1 for table test_message_src;
    • 查询已经创建的Publication
      • 语法示例
        select * from pg_publication;
      • 查询结果
                pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
        -----------------------+----------+--------------+-----------+-----------+-----------+-------------
         hg_publication_test_1 |    16728 | f            | t         | t         | t         | t
        (1 row)
        参数说明
        pubnamePublication名称。
        pubownerPublication拥有者。
        puballtables绑定多个物理表,默认为False,目前暂不支持。
        pubinsert是否发布INSERT类型的Binlog,默认为True,Binlog类型请参考Binlog格式说明
        pubupdate是否发布UPDATE类型的Binlog,默认为True。
        pubdelete是否发布DELETE类型的Binlog,默认为True。
        pubtruncate是否发布TRUNCATE类型的Binlog,默认为True。
    • 查询Publication关联的表
      • 语法示例
        select * from pg_publication_tables;
      • 查询结果
                pubname        | schemaname |    tablename
        -----------------------+------------+------------------
         hg_publication_test_1 | public     | test_message_src
        (1 row) 
        参数说明
        pubnamePublication名称。
        schemaname表所属schema的名称。
        tablename表名称。
    • 删除Publication
      • 语法示例
        DROP PUBLICATION name;
        name为已创建的Publication名称。
      • 使用示例
        DROP PUBLICATION hg_publication_test_1;
  • Replication Slot
    • 简介:

      在逻辑复制场景下,一个Replication Slot表示一个数据的更改流,该Replication Slot也与当前消费进度绑定,用于断点续传,详细内容可以参见Postgres文档Replication Slot。Replicaiton Slot用于维护Binlog消费的点位信息,使得消费端Failover之后可以从之前已经Commit的点位进行恢复。

    • 权限说明
      只有Superuser和Replication Role拥有创建和使用Replication Slot的权限。可以通过执行如下语句创建或移除Replication Role。
      -- 使用superuser将普通用户设置为replication role:
      alter role <user_name> replication;
      
      -- 使用superuser将replication role设置回普通用户:
      alter role <user_name> noreplication;
      user_name为阿里云账号ID或RAM用户,详情请参见账号概述
    • 创建Replication Slot
      • 语法示例
        call hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
      • 参数说明
        参数说明
        replication_slot_name自定义Replication Slot的名称。
        hgoutputBinlog输出格式的插件,当前仅支持hgoutput内置插件。
        publication_nameReplication Slot所绑定的Publication名称。
      • 使用示例
        --创建一个名称为hg_replication_slot_1的Replication Slot,并且绑定名称为hg_publication_test_1的Publication。
        call hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
    • 查询已经创建的Replication Slot
      • 语法示例
        select * from hologres.hg_replication_slot_properties;
      • 查询结果
               slot_name       | property_key |    property_value
        -----------------------+--------------+-----------------------
         hg_replication_slot_1 | plugin       | hgoutput
         hg_replication_slot_1 | publication  | hg_publication_test_1
         hg_replication_slot_1 | parallelism  | 1
        (3 rows)
        参数说明
        slot_name Replication Slot名称。
        property_key包含如下三个参数。
        • plugin:Replication Slot使用的插件,目前只支持hgoutput。
        • publication:Replication Slot对应的Publication。
        • parallelism:Replication Slot的并发数。
        property_valueproperty_key包含参数对应的值。
    • 查询Replication Slot的并发数

      Hologres是一个分布式数仓,所以一张表的数据会分布在多个Shard上,所以使用JDBC消费Binlog的时候,需要启动多个客户端连接,才能消费到完整的Binlog数据。通过以下命令可以查询消费hg_replication_slot_1所需要的并发数。

      • 语法示例
        select hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
      • 查询结果
        hg_get_logical_replication_slot_parallelism  
        ------------------------------------------------
                                                20 
    • 查询Replication Slot的消费进度
      • 语法示例
        select * from hologres.hg_replication_progress;
      • 查询结果
               slot_name       | parallel_index | lsn 
        -----------------------+----------------+-----
         hg_replication_slot_1 |              0 |  66
         hg_replication_slot_1 |              1 | 122
         hg_replication_slot_1 |              2 | 119
        
        (0 rows)
        参数说明
        slot_name Replication Slot名称。
        parallel_index并发序号。
        lsn当前消费到最后的Binlog序号。
    • 删除Replication Slot
      • 语法示例
        call hg_drop_logical_replication_slot('<replication_slot_name>');
        replication_slot_name为已经创建的Replication Slot名称。
      • 使用示例
        call hg_drop_logical_replication_slot('hg_replication_slot_1');

使用JDBC消费Binlog

  1. 添加POM依赖
    使用如下语句添加POM依赖。
    说明 添加POM依赖,请使用42.2.18及以上版本的JDBC。
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.2.23</version>
    </dependency>
    <!-- 用于获取表shcema以及解析binlog -->
    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>holo-client</artifactId>
        <version>2.2.2</version>
    </dependency>
  2. Java代码示例
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
    import com.alibaba.hologres.client.model.Record;
    import com.alibaba.hologres.client.model.TableSchema;
    
    import org.postgresql.PGConnection;
    import org.postgresql.PGProperty;
    import org.postgresql.replication.LogSequenceNumber;
    import org.postgresql.replication.PGReplicationStream;
    
    import java.nio.ByteBuffer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class Test {
    
        public static void main (String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://Endpoint:Port/db_test";
    
            // 创建JDBC连接
            Properties properties = new Properties();
            PGProperty.USER.set(properties, username);
            PGProperty.PASSWORD.set(properties, password);
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            // 消费Binlog,务必加上以下参数
            PGProperty.REPLICATION.set(properties, "database");
            try (Connection connection = DriverManager.getConnection(url, properties)) {
                // 创建PGReplicationStream并绑定Replicaiton slot,需要指定shardId
                int shardId = 0;
                PGConnection pgConnection = connection.unwrap(PGConnection.class);
                PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
                        .logical()
                        .withSlotName("hg_replication_slot_1")
                        .withSlotOption("parallel_index", shardId)
                        .withSlotOption("batch_size", "1024")
                        .withSlotOption("start_time", "2021-01-01 00:00:00")
                        .withSlotOption("start_lsn","0")
                        .start();
    
                // 创建holo-client
                HoloConfig holoConfig = new HoloConfig();
                holoConfig.setJdbcUrl(url);
                holoConfig.setUsername(username);
                holoConfig.setPassword(password);
                HoloClient client = new HoloClient(holoConfig);
    
                // 创建Binlog decoder用于Decode binary数据,schema需要通过HoloClient获取
                TableSchema schema = client.getTableSchema("test_message_src", true);
                HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
    
                // 消费数据
                ByteBuffer byteBuffer = pgReplicationStream.readPending();
                while (true) {
                    if (byteBuffer != null) {
                        List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
                        Long latestLsn = 0L;
                        for (BinlogRecord record : records) {
                            latestLsn = record.getBinlogLsn();
                            // Do Something
                            System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
                        }
                        // Commit Binlog 点位信息
                        pgReplicationStream.setFlushedLSN(LogSequenceNumber.valueOf(latestLsn));
                        pgReplicationStream.forceUpdateStatus();
                    }
                    byteBuffer = pgReplicationStream.readPending();
                }
            }
            // pgReplicationStream.close();
            // connection.close();
        }
                            
    创建PGReplicationStream时的withSlotOption可以指定如下参数。
    参数是否必须说明
    withSlotName指定Replication Slot的名称
    parallel_indexReplication Slot的并发数序号,表示PGReplicationStream消费的是第几个并发的数据,也即当前表对应Shard的数据。假设某个Replication Slot的并发数是3,则用户最多可以创建3个PGReplicationStream,每个PGReplicationStream的parallel_index参数分别是0、1、2。当前Hologres Binlog并不支持类似Kafka Consumer Group的实现,所以需要用户自己创建多个PGReplicationStream。
    start_time表示从某个时间点位开始消费Binlog,示例参数格式为:2021-01-01 12:00:00+08。
    如果未指定此参数,分为如下两种情况:
    • 第一次开始消费Replication Slot的Binlog,则从头开始消费,类似Kafka的Oldest。
    • 曾经消费过Replication Slot的Binlog,则尝试从之前Commit过的点位开始消费。
    start_lsn表示从某个lsn之后开始消费binlog,同时设置优先级高于start_time
    batch_size单次获取的Binlog最大批大小,单位为行,默认值为1024。
    说明
    • BinlogRecord 是decoder返回的Record类型,可以通过以下接口获取这条数据对应的binlog系统字段,详情见订阅Hologres Binlog
      • getBinlogLsn() 获取binlog的序号。
      • getBinlogTimestamp() 获取Binlog的系统时间戳。
      • getBinlogEventType() 获取Binlog的事件类型。
    • 消费Binlog之后,用户需要手动Commit点位信息,确保下次Failover能够恢复。

使用Holo Client消费Binlog

  • 消费Hologres Binlog功能已经集成至Holo Client中,您可以通过指定需要消费的物理表,方便的消费所有parallel_index的Binlog数据。
  • 使用Holo Client消费Binlog,需要占用与物理表shard数(slot并发数)相同的连接数,请保证连接数充足。
  • Holo Client消费binlog支持断点续传,由于网络连接失败等原因消费终止,在重新消费时会使用保存在hologres.hg_replication_progress表的消费进度进行恢复。因此消费时每张表的每个slot,只有一个作业在消费。
  • Holo Client不会自动保存消费点位,需要用户调用commit方法,详情请参见下方代码示例。
  1. 添加POM依赖
    使用如下语句添加POM依赖。
    说明 添加POM依赖,请使用2.1.0及以上版本的Holo Client。
    <dependency>
      <groupId>com.alibaba.hologres</groupId>
      <artifactId>holo-client</artifactId>
      <version>2.2.7</version>
    </dependency>
  2. Java代码示例
    import com.alibaba.hologres.client.BinlogShardGroupReader;
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.Subscribe;
    import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
    import com.alibaba.hologres.client.model.binlog.BinlogRecord;
    
    public class HoloBinlogExample {
    
        public static void main(String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://ip:port/database";
            String tableName = "test_message_src";
            String slotName = "hg_replication_slot_1";
    
            // 创建client的参数
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            holoConfig.setBinlogReadBatchSize(128);
            holoConfig.setBinlogIgnoreDelete(true);
            holoConfig.setBinlogIgnoreBeforeUpdate(true);
            holoConfig.setBinlogHeartBeatIntervalMs(5000L);
    
            HoloClient client = new HoloClient(holoConfig);
    
            // 消费binlog的请求,tableName和slotname为必要参数,Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以前者为例
            Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
                    .setBinlogReadStartTime("2021-01-01 12:00:00")
                    .build();
    
            // 创建binlog reader
            BinlogShardGroupReader reader = client.binlogSubscribe(subscribe);
    
            BinlogRecord record;
            long count = 0;
            while ((record = reader.getBinlogRecord()) != null) {
                // 消费到最新
                if (record instanceof BinlogHeartBeatRecord) {
                    // do something
                    continue;
                }
    
                // 每1000条数据保存一次消费点位,可以自行选择条件,比如按时间周期等等
                if (count % 1000 == 0) {
                    // 保存消费点位,参数表示超时时间,单位为ms
                    reader.commit(5000L);
                }
    
                //handle record
                count++;
            }
        }
    }
                            
    (可选)如需要,可以用OffsetBuilder创建Subscribe,从而为每个Shard指定起始消费点位。
    // 此处shardCount为示例,请替换为所消费表对应的实际数量
    int shardCount = 10;
    Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
    for (int i = 0; i < shardCount; i++) {
        // BinlogOffset通过setSequence指定lsn,通过setTimestamp指定时间,两者同时指定lsn优先级大于时间戳
        subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(0).setTimestamp("2021-01-01 12:00:00+08"));
    }
    Subscribe subscribe = subscribeBuilder.build();
    BinlogShardGroupReader reader = client.binlogSubscribe(subscribe);
    使用Holo Client消费Binlog时可以指定如下参数。
    参数是否必须默认值说明
    binlogReadBatchSize1024从每个Shard单次获取的Binlog最大批次大小,单位为行。
    binlogHeartBeatIntervalMs-1binlogRead 发送BinlogHeartBeatRecord的间隔。-1表示不发送

    当binlog没有新数据,每间隔binlogHeartBeatIntervalMs会下发一条BinlogHeartBeatRecord,此record的timestamp表示截止到这个时间的数据都已经消费完成。

    binlogIgnoreDeletefalse是否忽略Delete类型的Binlog。
    binlogIgnoreBeforeUpdatefalse是否忽略BeforeUpdate类型的Binlog。
    retryCount3消费失败时的重试次数,成功消费时重试次数会被重置。