本文为您介绍如何通过JDBC和Holo-Client这两种方式消费Hologres Binlog。
前提条件
- 需提前开启和配置Hologres Binlog,详情请参见订阅Hologres Binlog。
- 需创建hg_binlog extension。
- Hologres V2.0版本前,需要实例的Superuser执行以下语句创建extension才可以使用该功能,extension针对整个DB生效,一个DB只需执行一次,新建DB需要再次执行。
--创建 create extension hg_binlog; --删除 DROP extension hg_binlog;
- Hologres从 V2.0版本起,无需手动创建extension即可使用。
- Hologres V2.0版本前,需要实例的Superuser执行以下语句创建extension才可以使用该功能,extension针对整个DB生效,一个DB只需执行一次,新建DB需要再次执行。
- 需授予用户如下权限其中之一:
- 实例的Superuser权限
- 目标表的Owner权限及实例的Replication Role权限。
使用限制
- 仅Hologres V1.1及以上版本支持通过JDBC消费Hologres Binlog,如果您的实例是V1.1以下版本,请您使用自助升级或加入实时数仓Hologres交流群(钉钉群号:32314975)申请升级实例。
- 仅以下数据类型支持消费Hologres Binlog:INTEGER、BIGINT、SMALLINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、SERIAL、OID、int4[]、int8[]、float4[]、float8[]、boolean[]、text[],从HologresV1.3.36版本开始支持JSONB类型。如果表中有以上类型之外的数据类型,会造成消费失败。说明 Hologres从V1.3.36版本开始支持JSONB数据类型消费Hologres Binlog,消费之前需要开启如下GUC参数:
-- Session级别开启GUC set hg_experimental_enable_binlog_jsonb = on; -- DB级别开启GUC alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
- 同普通连接类似,在使用JDBC进行Binlog的消费时,所消费的每张表的每个Shard都会使用1个Walsender连接,Walsender连接与普通连接独立,互不影响。
- Walsenders数也有使用上限,可以通过以下命令查看单个Frontend节点的最大Walsender数(V2.0版本起默认值调整为1000,V1.1.26至V2.0版本之间默认为100),总的数目需要乘实例的Frontend节点数,不同规格实例的Frontend节点数请参见实例规格概述。
show max_wal_senders;
说明 Hologres实例支持的同时消费Binlog的表数量可以通过以下方式计算:表数量<=(max_wal_senders(100或1000) * FrontEnd节点数)/表的Shard Count
。例如:- 表a和表b的Shard Count都为20,表c的Shard Count为30,则三张表同时进行消费Binlog占用的Walsenders数量为
20 + 20 + 30 = 70
。 - 表a和表b的Shard Count都为20,表a同时有两个作业在进行Binlog消费,同时进行消费占用的Walsenders数量为
20 *2 + 20 = 60
。 - 一个实例有两个Frontend节点,则其最大Walsenders数为
100*2 = 200
,最多支持同时消费10张Shard Count为20的表同时进行消费Binlog。
FATAL: sorry, too many wal senders already
的错误信息,可以按照如下思路进行排查处理:- 检查使用JDBC进行Binlog消费的作业,减少其中非必要的Binlog消费。
- 检查Table Group与Shard数设计是否合理,详情请参见Table Group设置最佳实践。
- 如连接数仍超出限制,则须考虑扩容实例。
- 表a和表b的Shard Count都为20,表c的Shard Count为30,则三张表同时进行消费Binlog占用的Walsenders数量为
- 只读从实例暂不支持使用JDBC消费Binlog。
Publication和Replication Slot
- Publication
- 简介:
本质上是一组表,这些表的数据更改旨在通过逻辑复制进行表中数据复制,详细内容请参见Publication。当前Hologres支持的Publication只支持绑定一张物理表,且该表需要开启Binlog功能。
- 创建Publication
- 语法示例
CREATE PUBLICATION name FOR TABLE table_name;
- 参数说明
参数 说明 name 自定义Publication名称。 table_name 数据库中表名称。 - 使用示例
--示例创建一个名为hg_publication_test_1的Publication,且将表test_message_src添加至该Publication下 create publication hg_publication_test_1 for table test_message_src;
- 语法示例
- 查询已经创建的Publication
- 语法示例
select * from pg_publication;
- 查询结果
pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate -----------------------+----------+--------------+-----------+-----------+-----------+------------- hg_publication_test_1 | 16728 | f | t | t | t | t (1 row)
参数 说明 pubname Publication名称。 pubowner Publication拥有者。 puballtables 绑定多个物理表,默认为False,目前暂不支持。 pubinsert 是否发布INSERT类型的Binlog,默认为True,Binlog类型请参考Binlog格式说明。 pubupdate 是否发布UPDATE类型的Binlog,默认为True。 pubdelete 是否发布DELETE类型的Binlog,默认为True。 pubtruncate 是否发布TRUNCATE类型的Binlog,默认为True。
- 语法示例
- 查询Publication关联的表
- 语法示例
select * from pg_publication_tables;
- 查询结果
pubname | schemaname | tablename -----------------------+------------+------------------ hg_publication_test_1 | public | test_message_src (1 row)
参数 说明 pubname Publication名称。 schemaname 表所属schema的名称。 tablename 表名称。
- 语法示例
- 删除Publication
- 语法示例
name为已创建的Publication名称。DROP PUBLICATION name;
- 使用示例
DROP PUBLICATION hg_publication_test_1;
- 语法示例
- 简介:
- Replication Slot
- 简介:
在逻辑复制场景下,一个Replication Slot表示一个数据的更改流,该Replication Slot也与当前消费进度绑定,用于断点续传,详细内容可以参见Postgres文档Replication Slot。Replicaiton Slot用于维护Binlog消费的点位信息,使得消费端Failover之后可以从之前已经Commit的点位进行恢复。
- 权限说明只有Superuser和Replication Role拥有创建和使用Replication Slot的权限。可以通过执行如下语句创建或移除Replication Role。
user_name为阿里云账号ID或RAM用户,详情请参见账号概述。-- 使用superuser将普通用户设置为replication role: alter role <user_name> replication; -- 使用superuser将replication role设置回普通用户: alter role <user_name> noreplication;
- 创建Replication Slot
- 语法示例
call hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
- 参数说明
参数 说明 replication_slot_name 自定义Replication Slot的名称。 hgoutput Binlog输出格式的插件,当前仅支持hgoutput内置插件。 publication_name Replication Slot所绑定的Publication名称。 - 使用示例
--创建一个名称为hg_replication_slot_1的Replication Slot,并且绑定名称为hg_publication_test_1的Publication。 call hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
- 语法示例
- 查询已经创建的Replication Slot
- 语法示例
select * from hologres.hg_replication_slot_properties;
- 查询结果
slot_name | property_key | property_value -----------------------+--------------+----------------------- hg_replication_slot_1 | plugin | hgoutput hg_replication_slot_1 | publication | hg_publication_test_1 hg_replication_slot_1 | parallelism | 1 (3 rows)
参数 说明 slot_name Replication Slot名称。 property_key 包含如下三个参数。 - plugin:Replication Slot使用的插件,目前只支持hgoutput。
- publication:Replication Slot对应的Publication。
- parallelism:Replication Slot的并发数。
property_value property_key包含参数对应的值。
- 语法示例
- 查询Replication Slot的并发数
Hologres是一个分布式数仓,所以一张表的数据会分布在多个Shard上,所以使用JDBC消费Binlog的时候,需要启动多个客户端连接,才能消费到完整的Binlog数据。通过以下命令可以查询消费hg_replication_slot_1所需要的并发数。
- 语法示例
select hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
- 查询结果
hg_get_logical_replication_slot_parallelism ------------------------------------------------ 20
- 语法示例
- 查询Replication Slot的消费进度
- 语法示例
select * from hologres.hg_replication_progress;
- 查询结果
slot_name | parallel_index | lsn -----------------------+----------------+----- hg_replication_slot_1 | 0 | 66 hg_replication_slot_1 | 1 | 122 hg_replication_slot_1 | 2 | 119 (0 rows)
参数 说明 slot_name Replication Slot名称。 parallel_index 并发序号。 lsn 当前消费到最后的Binlog序号。
- 语法示例
- 删除Replication Slot
- 语法示例
replication_slot_name为已经创建的Replication Slot名称。call hg_drop_logical_replication_slot('<replication_slot_name>');
- 使用示例
call hg_drop_logical_replication_slot('hg_replication_slot_1');
- 语法示例
- 简介:
使用JDBC消费Binlog
- 添加POM依赖使用如下语句添加POM依赖。说明 添加POM依赖,请使用42.2.18及以上版本的JDBC。
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.23</version> </dependency> <!-- 用于获取表shcema以及解析binlog --> <dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.2</version> </dependency>
- Java代码示例
创建PGReplicationStream时的withSlotOption可以指定如下参数。import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder; import com.alibaba.hologres.client.model.Record; import com.alibaba.hologres.client.model.TableSchema; import org.postgresql.PGConnection; import org.postgresql.PGProperty; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.util.Arrays; import java.util.List; import java.util.Properties; public class Test { public static void main (String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://Endpoint:Port/db_test"; // 创建JDBC连接 Properties properties = new Properties(); PGProperty.USER.set(properties, username); PGProperty.PASSWORD.set(properties, password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); // 消费Binlog,务必加上以下参数 PGProperty.REPLICATION.set(properties, "database"); try (Connection connection = DriverManager.getConnection(url, properties)) { // 创建PGReplicationStream并绑定Replicaiton slot,需要指定shardId int shardId = 0; PGConnection pgConnection = connection.unwrap(PGConnection.class); PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream() .logical() .withSlotName("hg_replication_slot_1") .withSlotOption("parallel_index", shardId) .withSlotOption("batch_size", "1024") .withSlotOption("start_time", "2021-01-01 00:00:00") .withSlotOption("start_lsn","0") .start(); // 创建holo-client HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); HoloClient client = new HoloClient(holoConfig); // 创建Binlog decoder用于Decode binary数据,schema需要通过HoloClient获取 TableSchema schema = client.getTableSchema("test_message_src", true); HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema); // 消费数据 ByteBuffer byteBuffer = pgReplicationStream.readPending(); while (true) { if (byteBuffer != null) { List<BinlogRecord> records = decoder.decode(shardId, byteBuffer); Long latestLsn = 0L; for (BinlogRecord record : records) { latestLsn = record.getBinlogLsn(); // Do Something System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues())); } // Commit Binlog 点位信息 pgReplicationStream.setFlushedLSN(LogSequenceNumber.valueOf(latestLsn)); pgReplicationStream.forceUpdateStatus(); } byteBuffer = pgReplicationStream.readPending(); } } // pgReplicationStream.close(); // connection.close(); }
参数 是否必须 说明 withSlotName 是 指定Replication Slot的名称 parallel_index 是 Replication Slot的并发数序号,表示PGReplicationStream消费的是第几个并发的数据,也即当前表对应Shard的数据。假设某个Replication Slot的并发数是3,则用户最多可以创建3个PGReplicationStream,每个PGReplicationStream的parallel_index参数分别是0、1、2。当前Hologres Binlog并不支持类似Kafka Consumer Group的实现,所以需要用户自己创建多个PGReplicationStream。 start_time 否 表示从某个时间点位开始消费Binlog,示例参数格式为:2021-01-01 12:00:00+08。 如果未指定此参数,分为如下两种情况:- 第一次开始消费Replication Slot的Binlog,则从头开始消费,类似Kafka的Oldest。
- 曾经消费过Replication Slot的Binlog,则尝试从之前Commit过的点位开始消费。
start_lsn 否 表示从某个lsn之后开始消费binlog,同时设置优先级高于start_time batch_size 否 单次获取的Binlog最大批大小,单位为行,默认值为1024。 说明- BinlogRecord 是decoder返回的Record类型,可以通过以下接口获取这条数据对应的binlog系统字段,详情见订阅Hologres Binlog。
- getBinlogLsn() 获取binlog的序号。
- getBinlogTimestamp() 获取Binlog的系统时间戳。
- getBinlogEventType() 获取Binlog的事件类型。
- 消费Binlog之后,用户需要手动Commit点位信息,确保下次Failover能够恢复。
使用Holo Client消费Binlog
- 消费Hologres Binlog功能已经集成至Holo Client中,您可以通过指定需要消费的物理表,方便的消费所有parallel_index的Binlog数据。
- 使用Holo Client消费Binlog,需要占用与物理表shard数(slot并发数)相同的连接数,请保证连接数充足。
- Holo Client消费binlog支持断点续传,由于网络连接失败等原因消费终止,在重新消费时会使用保存在hologres.hg_replication_progress表的消费进度进行恢复。因此消费时每张表的每个slot,只有一个作业在消费。
- Holo Client不会自动保存消费点位,需要用户调用commit方法,详情请参见下方代码示例。
- 添加POM依赖使用如下语句添加POM依赖。说明 添加POM依赖,请使用2.1.0及以上版本的Holo Client。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.7</version> </dependency>
- Java代码示例
(可选)如需要,可以用OffsetBuilder创建Subscribe,从而为每个Shard指定起始消费点位。import com.alibaba.hologres.client.BinlogShardGroupReader; import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.Subscribe; import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord; import com.alibaba.hologres.client.model.binlog.BinlogRecord; public class HoloBinlogExample { public static void main(String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://ip:port/database"; String tableName = "test_message_src"; String slotName = "hg_replication_slot_1"; // 创建client的参数 HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); holoConfig.setBinlogReadBatchSize(128); holoConfig.setBinlogIgnoreDelete(true); holoConfig.setBinlogIgnoreBeforeUpdate(true); holoConfig.setBinlogHeartBeatIntervalMs(5000L); HoloClient client = new HoloClient(holoConfig); // 消费binlog的请求,tableName和slotname为必要参数,Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以前者为例 Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName) .setBinlogReadStartTime("2021-01-01 12:00:00") .build(); // 创建binlog reader BinlogShardGroupReader reader = client.binlogSubscribe(subscribe); BinlogRecord record; long count = 0; while ((record = reader.getBinlogRecord()) != null) { // 消费到最新 if (record instanceof BinlogHeartBeatRecord) { // do something continue; } // 每1000条数据保存一次消费点位,可以自行选择条件,比如按时间周期等等 if (count % 1000 == 0) { // 保存消费点位,参数表示超时时间,单位为ms reader.commit(5000L); } //handle record count++; } } }
使用Holo Client消费Binlog时可以指定如下参数。// 此处shardCount为示例,请替换为所消费表对应的实际数量 int shardCount = 10; Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName); for (int i = 0; i < shardCount; i++) { // BinlogOffset通过setSequence指定lsn,通过setTimestamp指定时间,两者同时指定lsn优先级大于时间戳 subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(0).setTimestamp("2021-01-01 12:00:00+08")); } Subscribe subscribe = subscribeBuilder.build(); BinlogShardGroupReader reader = client.binlogSubscribe(subscribe);
参数 是否必须 默认值 说明 binlogReadBatchSize 否 1024 从每个Shard单次获取的Binlog最大批次大小,单位为行。 binlogHeartBeatIntervalMs 否 -1 binlogRead 发送BinlogHeartBeatRecord的间隔。-1表示不发送 当binlog没有新数据,每间隔binlogHeartBeatIntervalMs会下发一条BinlogHeartBeatRecord,此record的timestamp表示截止到这个时间的数据都已经消费完成。
binlogIgnoreDelete 否 false 是否忽略Delete类型的Binlog。 binlogIgnoreBeforeUpdate 否 false 是否忽略BeforeUpdate类型的Binlog。 retryCount 否 3 消费失败时的重试次数,成功消费时重试次数会被重置。