本文为您介绍如何通过JDBC和Holo-Client这两种方式消费Hologres Binlog。
前提条件
需提前开启和配置Hologres Binlog,详情请参见订阅Hologres Binlog。
需创建hg_binlog extension。
Hologres V2.0版本前,需要实例的Superuser执行以下语句创建Extension才可以使用该功能,Extension针对整个DB生效,一个DB只需执行一次,新建DB需要再次执行。
--创建 CREATE extension hg_binlog; --删除 DROP extension hg_binlog;
重要不推荐使用
DROP EXTENSION <extension_name> CASCADE;
命令级联卸载Extension。CASCADE(级联)删除命令不仅会删除指定扩展本身,还会一并清除扩展数据(例如PostGIS数据、RoaringBitmap数据、Proxima数据、Binlog数据、BSI数据等)以及依赖该扩展的对象(包括元数据、表、视图、Server数据等)。Hologres从 V2.0版本起,无需手动创建extension即可使用。
Hologres V2.1版本起,支持通过如下两种方式进行Binlog消费。
全部版本支持:完成准备工作,包括为目标表创建Publication、为Publication创建Replication Slot后,直接进行目标表的Binlog消费。
说明该方法需授予用户如下权限其中之一:
实例的Superuser权限
目标表的Owner权限、CREATE DATABASE权限及实例的Replication Role权限。
仅Hologres V2.1版本起支持:为用户授予目标表的读权限,然后进行目标表的Binlog消费。
使用限制
仅Hologres V1.1及以上版本支持通过JDBC消费Hologres Binlog,如果您的实例是V1.1以下版本,请您使用自助升级或加入Hologres钉钉交流群反馈,详情请参见如何获取更多的在线支持?。
仅以下数据类型支持消费Hologres Binlog:INTEGER、BIGINT、SMALLINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、SERIAL、OID、int4[]、int8[]、float4[]、float8[]、boolean[]、text[],从HologresV1.3.36版本开始支持JSONB类型。如果表中有以上类型之外的数据类型,会造成消费失败。
说明Hologres从V1.3.36版本开始支持JSONB数据类型消费Hologres Binlog,消费之前需要开启如下GUC参数:
-- Session级别开启GUC SET hg_experimental_enable_binlog_jsonb = ON; -- DB级别开启GUC ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
同普通连接类似,使用JDBC进行Binlog的消费时,所消费的每张表的每个Shard都会使用1个Walsender连接,Walsender连接与普通连接独立,互不影响。
Walsenders数也有使用上限,可以通过以下命令查看单个Frontend节点的最大Walsender数(V2.2版本起默认值调整为600,V2.0和V2.1版本默认为1000,V1.1.26至V2.0版本之间默认为100),总的数目需要乘实例的Frontend节点数,不同规格实例的Frontend节点数请参见实例规格概述。
SHOW max_wal_senders;
说明Hologres实例支持的同时消费Binlog的表数量可以通过以下方式计算:
表数量<=(max_wal_senders(100或1000) * FrontEnd节点数)/表的Shard Count
。例如:
表a和表b的Shard Count都为20,表c的Shard Count为30,则三张表同时进行消费Binlog占用的Walsenders数量为
20 + 20 + 30 = 70
。表a和表b的Shard Count都为20,表a同时有两个作业在进行Binlog消费,同时进行消费占用的Walsenders数量为
20 *2 + 20 = 60
。一个实例有两个Frontend节点,则其最大Walsenders数为
100*2 = 200
,最多支持同时消费10张Shard Count为20的表同时进行消费Binlog。
如果使用JDBC进行Binlog消费的连接数达到上限,会提示
FATAL: sorry, too many wal senders already
的错误信息,可以按照如下思路进行排查处理:检查使用JDBC进行Binlog消费的作业,减少其中非必要的Binlog消费。
检查Table Group与Shard数设计是否合理,详情请参见Table Group设置最佳实践。
如连接数仍超出限制,则须考虑扩容实例。
Hologres V2.0.18版本前,只读从实例不支持通过JDBC消费Binlog功能。从V2.0.18版本起,只读从实例支持通过JDBC消费Hologres Binlog,但不支持记录消费进度。
注意事项
Hologres实例版本和Flink引擎版本不同支持消费Binlog的方式也不同,说明如下:
Hologres实例版本 | Flink引擎版本 | 说明 |
V2.1及以上版本 | 8.0.5及以上版本 | 无需创建Replication Slot,有表的读取权限即可消费Binlog。 |
V2.0版本 | 8.0.5及以下版本 | 默认使用JDBC模式,需要为目标表创建Publication、为Publication创建Replication Slot后,再进行目标表的Binlog消费。 |
V1.3及以下版本 | 8.0.5及以下版本 | 默认使用Holohub模式,有表的读取权限即可消费Binlog。 |
Hologres V2.0版本后不再支持Holohub模式消费Binlog,升级Hologres实例到 V2.0及以上版本之前,建议先升级Flink版本至8.0.5,此后消费Binlog会自动使用JDBC模式。
准备工作:创建Publication和Replication Slot
Hologres V2.1版本前,需要先为目标表创建Publication、为Publication创建Replication Slot后,才可以进行Binlog消费。
Hologres V2.1版本起,除上述方法外,还支持仅有目标表读权限的用户进行Binlog消费。该方法无法查询Hologres侧记录的Binlog消费进度,建议消费端自行记录消费进度。
Publication
简介
本质上是一组表,这些表的数据更改旨在通过逻辑复制进行表中数据复制,详细内容请参见Publication。当前Hologres支持的Publication只支持绑定一张物理表,且该表需要开启Binlog功能。
创建Publication
语法示例
CREATE PUBLICATION name FOR TABLE table_name;
参数说明
参数 | 说明 |
name | 自定义Publication名称。 |
table_name | 数据库中表名称。 |
使用示例
--示例创建一个名为hg_publication_test_1的Publication,且将表test_message_src添加至该Publication下
CREATE publication hg_publication_test_1 FOR TABLE test_message_src;
查询已经创建的Publication
语法示例
SELECT * FROM pg_publication;
查询结果
pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-----------------------+----------+--------------+-----------+-----------+-----------+-------------
hg_publication_test_1 | 16728 | f | t | t | t | t
(1 row)
参数 | 说明 |
pubname | Publication名称。 |
pubowner | Publication拥有者。 |
puballtables | 绑定多个物理表,默认为False,目前暂不支持。 |
pubinsert | 是否发布INSERT类型的Binlog,默认为True,Binlog类型请参考Binlog格式与原理。 |
pubupdate | 是否发布UPDATE类型的Binlog,默认为True。 |
pubdelete | 是否发布DELETE类型的Binlog,默认为True。 |
pubtruncate | 是否发布TRUNCATE类型的Binlog,默认为True。 |
查询Publication关联的表
语法示例
SELECT * FROM pg_publication_tables;
查询结果
pubname | schemaname | tablename
-----------------------+------------+------------------
hg_publication_test_1 | public | test_message_src
(1 row)
参数 | 说明 |
pubname | Publication名称。 |
schemaname | 表所属schema的名称。 |
tablename | 表名称。 |
删除Publication
语法示例
DROP PUBLICATION name;
name为已创建的Publication名称。
使用示例
DROP PUBLICATION hg_publication_test_1;
Replication Slot
简介
在逻辑复制场景下,一个Replication Slot表示一个数据的更改流,该Replication Slot也与当前消费进度绑定,用于断点续传,详细内容可以参见Postgres文档Replication Slot。Replication Slot用于维护Binlog消费的点位信息,使得消费端Failover之后可以从之前已经Commit的点位进行恢复。
权限说明
只有Superuser和Replication Role拥有创建和使用Replication Slot的权限。可以通过执行如下语句创建或移除Replication Role。
-- 使用superuser将普通用户设置为replication role:
ALTER role <user_name> replication;
-- 使用superuser将replication role设置回普通用户:
ALTER role <user_name> noreplication;
user_name为阿里云账号ID或RAM用户,详情请参见账号概述。
创建Replication Slot
语法示例
CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
参数说明
参数 | 说明 |
replication_slot_name | 自定义Replication Slot的名称。 |
hgoutput | Binlog输出格式的插件,当前仅支持hgoutput内置插件。 |
publication_name | Replication Slot所绑定的Publication名称。 |
使用示例
--创建一个名称为hg_replication_slot_1的Replication Slot,并且绑定名称为hg_publication_test_1的Publication。
CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
查询已经创建的Replication Slot
语法示例
SELECT * FROM hologres.hg_replication_slot_properties;
查询结果
slot_name | property_key | property_value
-----------------------+--------------+-----------------------
hg_replication_slot_1 | plugin | hgoutput
hg_replication_slot_1 | publication | hg_publication_test_1
hg_replication_slot_1 | parallelism | 1
(3 rows)
参数 | 说明 |
slot_name | Replication Slot名称。 |
property_key | 包含如下三个参数。
|
property_value | property_key包含参数对应的值。 |
查询通过Replication Slot消费整张表Binlog所需的并发数
Hologres是一个分布式数仓,所以一张表的数据会分布在多个Shard上,所以使用JDBC消费Binlog的时候,需要启动多个客户端连接,才能消费到完整的Binlog数据。通过以下命令可以查询消费hg_replication_slot_1所需要的并发数。
语法示例
SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
查询结果
hg_get_logical_replication_slot_parallelism
------------------------------------------------
20
查询Replication Slot的消费进度(Hologres侧记录的Binlog消费进度)
语法示例
SELECT * FROM hologres.hg_replication_progress;
查询结果
slot_name | parallel_index | lsn
-----------------------+----------------+-----
hg_replication_slot_1 | 0 | 66
hg_replication_slot_1 | 1 | 122
hg_replication_slot_1 | 2 | 119
(0 rows)
参数 | 说明 |
slot_name | Replication Slot名称。 |
parallel_index | 并发序号。 |
lsn | 当前消费到最后的Binlog序号。 |
表hologres.hg_replication_progress在第一次消费Binlog后才会创建。
表hologres.hg_replication_progress实际记录的是用户主动commit的消费位点,需要用户在代码中手动调用commit lsn相关函数来提交Binlog点位信息。由于该表实际记录的内容完全取决于用户最后一次commit,因此该值并不能完全真实正确地反映用户侧实际的消费位点。因此,建议在消费端自行记录lsn,并将其作为消费终止时的恢复位点。下述JDBC消费Binlog与Holo-client消费Binlog的示例代码中,均不包含commit lsn的相关代码。
手动Commit Binlog点位信息,仅当使用replication slot消费Binlog时有效。当通过table name消费Binlog时,表hologres.hg_replication_progress中不会记录和保留该点位结果。
删除Replication Slot
语法示例
CALL hg_drop_logical_replication_slot('<replication_slot_name>');
replication_slot_name为已经创建的Replication Slot名称。
使用示例
CALL hg_drop_logical_replication_slot('hg_replication_slot_1');
使用JDBC消费Binlog
添加POM依赖
使用如下语句添加POM依赖。
说明添加POM依赖,请使用42.2.18及以上版本的JDBC。
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.3.8</version> </dependency> <!-- 用于获取表schema以及解析binlog --> <dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency>
Java代码示例
import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder; import com.alibaba.hologres.client.model.Record; import com.alibaba.hologres.client.model.TableSchema; import org.postgresql.PGConnection; import org.postgresql.PGProperty; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.util.Arrays; import java.util.List; import java.util.Properties; public class Test { public static void main (String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://Endpoint:Port/db_test"; // 创建JDBC连接 Properties properties = new Properties(); PGProperty.USER.set(properties, username); PGProperty.PASSWORD.set(properties, password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); // 消费Binlog,务必加上以下参数 PGProperty.REPLICATION.set(properties, "database"); try (Connection connection = DriverManager.getConnection(url, properties)) { // 创建PGReplicationStream并绑定Replicaiton slot,需要指定shardId int shardId = 0; PGConnection pgConnection = connection.unwrap(PGConnection.class); PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream() .logical() // 2.1版本起,此处有两种可行的写法 // 方法1:withSlotName参数为准备阶段创建的Replication Slot名,不需填写withSlotOption("table_name","xxx") // 方法2:不需填写withSlotName参数,需填写withSlotOption("table_name","xxx") .withSlotName("slot_name") .withSlotOption("table_name","public.test_messsage_src") // 消费的表名 .withSlotOption("parallel_index", shardId) .withSlotOption("batch_size", "1024") .withSlotOption("start_time", "2021-01-01 00:00:00") .withSlotOption("start_lsn","0") .start(); // 尽管我们不直接使用holo-client的接口消费binlog,但是需要holo-client的接口去解析消费到的数据。 // 创建holo-client HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); HoloClient client = new HoloClient(holoConfig); // 创建Binlog decoder用于Decode binary数据,schema需要通过HoloClient获取 TableSchema schema = client.getTableSchema("test_message_src", true); HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema); // 用于记录当前消费位点,用以在消费中断后,用该值进行继续消费 Long currentLsn = 0; // 消费数据 ByteBuffer byteBuffer = pgReplicationStream.readPending(); while (true) { if (byteBuffer != null) { List<BinlogRecord> records = decoder.decode(shardId, byteBuffer); Long latestLsn = 0L; for (BinlogRecord record : records) { latestLsn = record.getBinlogLsn(); // Do Something System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues())); } // 保存消费位点 currentLsn = latestLsn; pgReplicationStream.forceUpdateStatus(); } byteBuffer = pgReplicationStream.readPending(); } } // pgReplicationStream.close(); // connection.close(); }
创建PGReplicationStream时,需要通过withSlotName指定Replication Slot:
Hologres V2.1版本前,需要填写已创建的Replication Slot名称。
Hologres V2.1版本起,无需填写withSlotName,只需在Slot Options中指定目标表名。
此外,withSlotOption可以指定如下参数。
参数
是否必须
说明
table_name
当不指定withSlotName时必须,否则为无效参数。
当不指定withSlotName时,table_name代表了想要消费的目标表名。格式为schema_name.table_name或table_name。
parallel_index
是
在使用PGReplicationStream进行Binlog消费时,一个PGReplicationStream会建立1个Walsender连接,对目标表的1个Shard的Binlog进行消费。parallel_index即代表了消费目标表的第parallel_index个Shard的数据。
假设某个表有3个Shard,则通过Replication Slot消费Binlog所需要的并发数为3,则用户最多可以创建3个PGReplicationStream,每个PGReplicationStream的parallel_index参数分别是0、1、2。
当前JDBC消费Hologres Binlog并不支持类似Kafka Consumer Group的实现,所以需要用户自行创建多个PGReplicationStream。
start_time
否
表示从某个时间点位开始消费Binlog,示例参数格式为:2021-01-01 12:00:00+08。
如果未指定start_lsn或start_time,分为如下三种情况:
第一次开始消费Replication Slot的Binlog,则从头开始消费,类似Kafka的Oldest。
曾经消费过Replication Slot的Binlog,则尝试从之前Commit过的点位开始消费。
对于不指定withSlotName但指定了table_name的使用场景,不论是否曾经消费过该表的Binlog,都会从头开始消费。
start_lsn
否
表示从某个lsn之后开始消费Binlog,同时设置优先级高于start_time。
batch_size
否
单次获取的Binlog最大批大小,单位为行,默认值为1024。
说明BinlogRecord 是decoder返回的Record类型,可以通过以下接口获取这条数据对应的binlog系统字段,详情见订阅Hologres Binlog。
getBinlogLsn() 获取binlog的序号。
getBinlogTimestamp() 获取Binlog的系统时间戳。
getBinlogEventType() 获取Binlog的事件类型。
消费Binlog之后,用户需要手动Commit点位信息,确保下次Failover能够恢复。
使用Holo Client消费Binlog
消费Hologres Binlog功能已经集成至Holo Client中,您可以通过指定需要消费的物理表,方便地消费所有Shard的Binlog数据。
使用Holo Client消费Binlog,需要占用与物理表shard数(slot并发数)相同的连接数,请保证连接数充足。
使用Holo Client消费Binlog过程中,推荐您根据Shard自行保存消费点位,在由于网络连接失败等原因导致消费终止时,可以通过消费点位进行恢复,详情请参见下方代码示例。
添加POM依赖
使用如下语句添加POM依赖。
说明添加POM依赖,推荐使用2.2.10及以上版本的Holo Client,2.2.9及之前的版本存在内存泄露的问题。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency>
Java代码示例
import com.alibaba.hologres.client.BinlogShardGroupReader; import com.alibaba.hologres.client.Command; import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.Subscribe; import com.alibaba.hologres.client.exception.HoloClientException; import com.alibaba.hologres.client.impl.binlog.BinlogOffset; import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord; import com.alibaba.hologres.client.model.binlog.BinlogRecord; import java.util.HashMap; import java.util.Map; public class HoloBinlogExample { public static BinlogShardGroupReader reader; public static void main(String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://ip:port/database"; String tableName = "test_message_src"; String slotName = "hg_replication_slot_1"; // 创建client的参数 HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); holoConfig.setBinlogReadBatchSize(128); holoConfig.setBinlogIgnoreDelete(true); holoConfig.setBinlogIgnoreBeforeUpdate(true); holoConfig.setBinlogHeartBeatIntervalMs(5000L); HoloClient client = new HoloClient(holoConfig); // 获取表的shard数 int shardCount = Command.getShardCount(client, client.getTableSchema(tableName)); // 使用map保存每个shard的消费进度, 初始化为0 Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount); for (int i = 0; i < shardCount; i++) { shardIdToLsn.put(i, 0L); } // 消费binlog的请求,2.1版本前tableName和slotname为必要参数,2.1版本起仅需传入tableName(等价于前文使用的固定slotName“hg_table_name_slot”)。 // Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以前者为例 Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName) .setBinlogReadStartTime("2021-01-01 12:00:00") .build(); // 创建binlog reader reader = client.binlogSubscribe(subscribe); BinlogRecord record; int retryCount = 0; long count = 0; while(true) { try { if (reader.isCanceled()) { // 根据保存的消费点位重新创建reader reader = client.binlogSubscribe(subscribe); } while ((record = reader.getBinlogRecord()) != null) { // 消费到最新 if (record instanceof BinlogHeartBeatRecord) { // do something continue; } // 处理读取到的binlog record,这里只做打印 System.out.println(record); // 处理之后保存消费点位,异常时可以从此点位恢复 shardIdToLsn.put(record.getShardId(), record.getBinlogLsn()); count++; // 读取成功,重置重试次数 retryCount = 0; } } catch (HoloClientException e) { if (++retryCount > 10) { throw new RuntimeException(e); } // 发生异常时推荐打印warn级别日志 System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount)); // 重试期间进行一定时间的等待 Thread.sleep(5000L * retryCount); // 用OffsetBuilder创建Subscribe,从而为每个shard指定起始消费点位 Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName); for (int i = 0; i < shardCount; i++) { // BinlogOffset通过setSequence指定lsn,通过setTimestamp指定时间,两者同时指定lsn优先级大于时间戳 // 这里根据shardIdToLsn这个Map中保存的消费进度进行恢复 subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i))); } subscribe = subscribeBuilder.build(); // 关闭reader reader.cancel(); } } } }
使用Holo Client消费Binlog时可以指定如下参数。
参数
是否必须
默认值
说明
binlogReadBatchSize
否
1024
从每个Shard单次获取的Binlog最大批次大小,单位为行。
binlogHeartBeatIntervalMs
否
-1
binlogRead发送BinlogHeartBeatRecord的间隔,
-1
表示不发送。当Binlog没有新数据,每间隔binlogHeartBeatIntervalMs会下发一条BinlogHeartBeatRecord,此record的timestamp表示截止到这个时间这个Shard上的数据都已经消费完成。
binlogIgnoreDelete
否
false
是否忽略Delete类型的Binlog。
binlogIgnoreBeforeUpdate
否
false
是否忽略BeforeUpdate类型的Binlog。
常见问题
消费Binlog并提交消费进度后,发现表hologres.hg_replication_progress不存在,或表中没有消费进度数据,可能原因如下:
消费时不通过Replication Slot进行,即不指定参数withSlotName,该场景不支持记录消费进度。
使用了只读从实例,且该DB是第一次被消费Binlog,此时hologres.hg_replication_progress表创建失败。Hologres V2.0.18版本起已修复,从实例可以正常消费Binlog。Hologres V2.0.18版本前,需要使用主实例先消费一次Binlog,从实例即可正常消费。
如果不是上述原因,请加入Hologres钉钉交流群联系值班人员处理,详情请参见如何获取更多的在线支持?。