实时计算Flink版实时消费Hologres

Hologres连接器支持实时消费Hologres,即实时消费Hologres的Binlog数据。本文为您介绍实时计算Flink版消费Hologres的详情。

使用限制

  • Hologres 0.10及以下版本,已存在的表无法修改表属性开启Binlog,需要重新建表。Hologres V1.1及以上版本,可以根据业务需要选择开启或关闭Binlog能力,同时支持配置TTL满足不同业务场景对Binlog保留时间的诉求,详情请参见订阅Hologres Binlog

  • 不支持开启分区表父表的Binlog,请使用非分区表。

  • 暂不支持实时消费TIMESTAMP类型的数据,因此创建Hologres表时,请使用TIMESTAMPTZ类型。

  • 默认的Binlog源表不支持数组类型,仅支持INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)和TIMESTAMPTZ数据类型。

    说明

    对不支持的数据类型(例如SMALLINT),即使不消费此字段,仍然可能导致作业无法上线。

  • 实时计算引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表,VVR 6.0.7版本开始默认通过JDBC模式消费Hologres Binlog。相比原有Holohub模式,支持更完善的数据类型,如SMALLINT,数组类型等,同时也支持了自定义用户(非RAM用户)。详见下方JDBC模式Binlog源表

    Hologres 2.0及以上版本下线了Holohub模式,全面转为JDBC模式。如果您的Flink版本小于6.0.7,需要显式指定sdkMode参数为jdbc,或升级您的Flink版本。

  • Hologres 1.3.41版本开始,JDBC模式Binlog源表新支持读取JSONB类型,但需要数据库级别开启GUC,开启GUC的命令如下。

    --db级别开启GUC,仅superuser可以执行,每个db只需要设置一次。
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • 实时计算引擎VVR 8.0.4起,连接器如果发现用户使用的Hologres实例大于2.0版本,会强制使用JDBC模式消费Binlog。推荐Hologres实例升级至2.1版本,可以从Holohub模式无缝切换。如果Hologres实例是2.0版本,且用户不是Superuser,使用JDBC模式消费Binlog需要特别进行权限的配置,否则作业上线时可能抛出“permission denied for database”的异常。需要的权限包括Database的CREATE权限,以及实例的Replication Role权限,授权SQL如下。

    -- 专家权限模型下为用户授予CREATE权限,以及赋予用户实例的Replication Role权限
    GRANT CREATE ON DATABASE database_name TO <user_name>;
    alter role <user_name> replication;
    
    -- 如果Database开启了简单权限模型(SLMP),无法执行GRANT语句,使用spm_grant为用户授予DB的Admin权限,也可以在Holoweb中直接赋权
    call spm_grant('{dbname}_admin', '云账号id/云邮箱/RAM账号');
    alter role <user_name> replication;

注意事项

  • Hologres Binlog以行存的形式记录了数据的变更前后的整行数据,因此列存表生成Binlog时的反查开销要大于行存表。对于数据更新频繁的场景,建议使用行存表来开启Binlog,否则Binlog生成会成为表写入时的性能瓶颈,如果这张表同时还用于OLAP等分析查询,建议使用行列共存的存储格式。

  • UPDATE操作会产生两条Binlog记录,分别为更新操作前和操作后的数据记录,因此您会消费到两条数据。但是,Hologres Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。

  • 建议Flink作业并发数和Hologres Table的Shard个数保持一致。

    您可以在Hologres控制台上,使用以下语句查看Table的Shard数,其中tablename为您的业务表名称。

    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • 如果作业从检查点恢复过程中,发生table id parsed from checkpoint is different from the current table id异常,可以升级到VVR-8.0.9版本启动作业。这是由于实时计算引擎VVR 8.0.5~VVR 8.0.8版本,Hologres Binlog源表从checkpoint恢复时,会强制检查hologres表的table id,如果当前表的table id和checkpoint中保存的不一致,会无法从checkpoint恢复。此异常表示作业运行期间,用户对源表进行了TRUNCATE或其他重建表操作。考虑到用户使用场景的复杂性,在VVR 8.0.9取消了对table id的强制检查,但仍然不推荐对Binlog源表做重建表操作。重建表时原有表的历史Binlog会全部清除,Flink使用旧表的消费位点去消费新表的数据,可能导致数据不一致等不符合预期的情况。

开启Binlog

实时消费功能默认关闭,因此在Hologres控制台上创建表的DDL时,需要设置binlog.levelbinlog.ttl参数,示例如下。

begin;
CREATE TABLE test_message_src(
 id int primary key, 
 title text not null, 
 body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表后开启Binlog。
call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
commit;

其中,binlog.level设置为replica即代表开启Binlog,binlog.ttl为Binlog的TTL,单位为秒。

消费模式

非CDC模式

该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是INSERT类型的数据,您可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。源表DDL代码示例如下。

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

CDC模式

该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER类型,这样就能完成表的数据的镜像同步,类似MySQL或Postgres的CDC功能。源表DDL代码示例如下。

CREATE TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

全增量一体源表消费

在源表Join维表时,由于Binlog的TTL等原因,会导致无法使用源表的所有数据。原解决方案是为Binlog表设置一个很大的TTL,但这样会有以下问题:

  • 历史Binlog数据会被长时间保存,导致占用较多的存储资源。

  • 因为Binlog包含数据更新记录,使用Binlog进行全量消费会消费一些不必要的数据,导致占用较多的计算资源,且无法让用户只关注最新的数据。

从VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据。采用这种方式,可以解决上述问题。

适用场景

  • 适用于历史数据不包含Binlog,但又希望消费所有数据的场景。

  • 仅适用于目标表有主键的场景,推荐在CDC模式下使用的全增量Hologres源表。

  • Hologres1.1版本之后,支持按需开启Binlog,可以将已有历史数据的表打开Binlog。

代码示例

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
  );

JDBC模式Binlog源表

实时计算引擎VVR 6.0.7版本开始,binlog源表新增JDBC模式(不同于CDC等消费模式,此处的JDBC模式是指底层获取binlog的SDK基于JDBC)。相比原有Holohub模式,JDBC模式的Binlog源表:

  • 支持更多的数据类型。包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大于1.3.41且开启相应GUC,详见本文使用限制)。

  • 支持Hologres的自定义用户(非RAM用户)。

使用方式与普通的binlog源表类似,但需要设置sdkMode为jdbc,示例如下。

create TEMPORARY table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc', --使用jdbc模式的binlog源表
  'jdbcBinlogSlotName'='replication_slot_name' --可选,不设置会自动创建
);

jdbcBinlogSlotName是jdbc模式消费binlog的一个可选参数,如果不设置,Hologres连接器会创建默认的slot并使用,默认创建的publication名称类似publication_for_table_<table_name>_used_by_flink,默认创建的slot名称类似slot_for_table_<table_name>_used_by_flink,在使用中如果发生异常,可以尝试删除并重试。默认创建slot需要一定的前提条件,要求用户为实例的Superuser,或者同时拥有Database的CREATE权限和实例的Replication Role权限。如果没有权限导致作业上线失败,可以尝试如下操作,或者参考通过JDBC消费Hologres Binlog文档进行处理。Hologres2.1版本起,JDBC模式消费Binlog不再需要配置slot,因此Hologres连接器从VVR 8.0.5开始,判断Hologres实例为2.1及以上版本,也不再自动创建默认的slot。

-- 专家权限模型下为用户授予CREATE权限,以及赋予用户实例的Replication Role权限
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;

-- 如果Database开启了简单权限模型(SLMP),无法执行GRANT语句,使用spm_grant为用户授予DB的Admin权限,也可以在Holoweb中直接赋权
call spm_grant('{dbname}_admin', '云账号id/云邮箱/RAM账号');
alter role <user_name> replication;

说明

目前删除表并重建同名表可能导致作业出现"no table is defined in publication"或者"The table xxx has no slot named xxx"异常,原因是表被删除时,和表绑定的publication没有被删除。当发生此异常时,可以在hologres中执行select * from pg_publication where pubname not in (select pubname from pg_publication_tables);语句,查询删表时未一起被清理的publication,并执行drop publication xx;语句删除残留的publication,之后重新启动作业即可。或者选择VVR 8.0.5版本,连接器会自动执行清理操作。

Hologres Binlog实现原理

一条Binlog的字段由Binlog系统字段和用户Table字段组成,字段定义如下:

字段名

字段类型

说明

hg_binlog_lsn

BIGINT

Binlog系统字段,表示Binlog序号,Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。

hg_binlog_event_type

BIGINT

Binlog系统字段,表示当前记录所表示的修改类型,参数取值如下:

  • INSERT=5:表示当前Binlog为插入一条新的记录。

  • DELETE=2:表示当前Binlog为删除一条已有的记录。

  • BEFORE_UPDATE=3:表示当前Binlog为更新操作前的记录。

  • AFTER_UPDATE=7:表示当前Binlog为更新操作后的记录。

hg_binlog_timestamp_us

BIGINT

Binlog系统字段,系统时间戳,单位为微秒。

user_table_column_1

用户定义

用户的表字段。

...

...

用户的表字段。

user_table_column_n

用户定义

用户的表字段。