实时计算Flink版实时消费Hologres

更新时间:2025-01-16 02:33:12

Hologres连接器支持实时消费Hologres,即实时消费HologresBinlog数据。本文为您介绍实时计算Flink版消费Hologres的详情。

使用限制

  • Hologres 0.10及以下版本,已存在的表无法修改表属性开启Binlog,需要重新建表。Hologres V1.1及以上版本,可以根据业务需要选择开启或关闭Binlog能力,同时支持配置TTL满足不同业务场景对Binlog保留时间的诉求,详情请参见订阅Hologres Binlog

  • 实时计算引擎VVR 8.0.11及以上版本支持消费分区表父表的Binlog。

  • 暂不支持实时消费TIMESTAMP类型的数据,因此创建Hologres表时,请使用TIMESTAMPTZ类型。

  • 默认的Binlog源表不支持数组类型,仅支持INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)和TIMESTAMPTZ数据类型。

    说明

    对不支持的数据类型(例如SMALLINT),即使不消费此字段,仍然可能导致作业无法上线。

  • 实时计算引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表,VVR 6.0.7版本开始默认通过JDBC模式消费Hologres Binlog。相比原有Holohub模式,支持更完善的数据类型,如SMALLINT,数组类型等,同时也支持了自定义用户(非RAM用户)。详见下方JDBC模式Binlog源表

    Hologres 2.0及以上版本下线了Holohub模式,全面转为JDBC模式。如果您的Flink版本小于6.0.7,需要显式指定sdkMode参数为jdbc,或升级您的Flink版本。

  • Hologres 1.3.41版本开始,JDBC模式Binlog源表新支持读取JSONB类型,但需要数据库级别开启GUC,开启GUC的命令如下。

    --db级别开启GUC,仅superuser可以执行,每个db只需要设置一次。
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • 实时计算引擎VVR 8.0.4起,连接器如果发现用户使用的Hologres实例大于2.0版本,会强制使用JDBC模式消费Binlog。推荐Hologres实例升级至2.1版本,可以从Holohub模式无缝切换。如果Hologres实例是2.0版本,且用户不是Superuser,使用JDBC模式消费Binlog需要特别进行权限的配置,否则作业上线时可能抛出“permission denied for database”的异常。需要的权限包括DatabaseCREATE权限,以及实例的Replication Role权限,授权SQL如下。

    -- 专家权限模型下为用户授予CREATE权限,以及赋予用户实例的Replication Role权限
    GRANT CREATE ON DATABASE database_name TO <user_name>;
    alter role <user_name> replication;
    
    -- 如果Database开启了简单权限模型(SLMP),无法执行GRANT语句,使用spm_grant为用户授予DB的Admin权限,也可以在Holoweb中直接赋权
    call spm_grant('{dbname}_admin', '云账号id/云邮箱/RAM账号');
    alter role <user_name> replication;

注意事项

  • Hologres Binlog以行存的形式记录了数据的变更前后的整行数据,因此列存表生成Binlog时的反查开销要大于行存表。对于数据更新频繁的场景,建议使用行存表来开启Binlog,否则Binlog生成会成为表写入时的性能瓶颈,如果这张表同时还用于OLAP等分析查询,建议使用行列共存的存储格式。

  • UPDATE操作会产生两条Binlog记录,分别为更新操作前和操作后的数据记录,因此您会消费到两条数据。但是,Hologres Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。

  • 建议Flink作业并发数和Hologres TableShard个数保持一致。

    您可以在Hologres控制台上,使用以下语句查看TableShard数,其中tablename为您的业务表名称。

    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • 如果作业从检查点恢复过程中,发生table id parsed from checkpoint is different from the current table id异常,可以升级到VVR-8.0.9版本启动作业。这是由于实时计算引擎VVR 8.0.5~VVR 8.0.8版本,Hologres Binlog源表从checkpoint恢复时,会强制检查hologres表的table id,如果当前表的table idcheckpoint中保存的不一致,会无法从checkpoint恢复。此异常表示作业运行期间,用户对源表进行了TRUNCATE或其他重建表操作。考虑到用户使用场景的复杂性,在VVR 8.0.9取消了对table id的强制检查,但仍然不推荐对Binlog源表做重建表操作。重建表时原有表的历史Binlog会全部清除,Flink使用旧表的消费位点去消费新表的数据,可能导致数据不一致等不符合预期的情况。

开启Binlog

实时消费功能默认关闭,因此在Hologres控制台上创建表的DDL时,需要设置binlog.levelbinlog.ttl参数,示例如下。

begin;
CREATE TABLE test_message_src(
 id int primary key, 
 title text not null, 
 body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表后开启Binlog。
call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
commit;

其中,binlog.level设置为replica即代表开启Binlog,binlog.ttlBinlogTTL,单位为秒。

消费模式

非CDC模式
CDC模式

该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是INSERT类型的数据,您可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。源表DDL代码示例如下。

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型,例如,INSERT、DELETE、UPDATE_BEFOREUPDATE_AFTER类型,这样就能完成表的数据的镜像同步,类似MySQLPostgresCDC功能。源表DDL代码示例如下。

CREATE TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

全增量一体源表消费

在源表Join维表时,由于BinlogTTL等原因,会导致无法使用源表的所有数据。原解决方案是为Binlog表设置一个很大的TTL,但这样会有以下问题:

  • 历史Binlog数据会被长时间保存,导致占用较多的存储资源。

  • 因为Binlog包含数据更新记录,使用Binlog进行全量消费会消费一些不必要的数据,导致占用较多的计算资源,且无法让用户只关注最新的数据。

VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据。采用这种方式,可以解决上述问题。

适用场景

  • 适用于历史数据不包含Binlog,但又希望消费所有数据的场景。

  • 仅适用于目标表有主键的场景,推荐在CDC模式下使用的全增量Hologres源表。

  • Hologres1.1版本之后,支持按需开启Binlog,可以将已有历史数据的表打开Binlog。

代码示例

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
  );

JDBC模式Binlog源表

实时计算引擎VVR 6.0.7版本开始,binlog源表新增JDBC模式(不同于CDC等消费模式,此处的JDBC模式是指底层获取binlogSDK基于JDBC)。相比原有Holohub模式,JDBC模式的Binlog源表:

  • 支持更多的数据类型。包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大于1.3.41且开启相应GUC,详见本文使用限制)。

  • 支持Hologres的自定义用户(非RAM用户)。

使用方式与普通的binlog源表类似,但需要设置sdkModejdbc,示例如下。

create TEMPORARY table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc', --使用jdbc模式的binlog源表
  'jdbcBinlogSlotName'='replication_slot_name' --可选,不设置会自动创建
);

jdbcBinlogSlotNamejdbc模式消费binlog的一个可选参数,如果不设置,Hologres连接器会创建默认的slot并使用,默认创建的publication名称类似publication_for_table_<table_name>_used_by_flink,默认创建的slot名称类似slot_for_table_<table_name>_used_by_flink,在使用中如果发生异常,可以尝试删除并重试。默认创建slot需要一定的前提条件,要求用户为实例的Superuser,或者同时拥有DatabaseCREATE权限和实例的Replication Role权限。如果没有权限导致作业上线失败,可以尝试如下操作,或者参考通过JDBC消费Hologres Binlog文档进行处理。Hologres2.1版本起,JDBC模式消费Binlog不再需要配置slot,因此Hologres连接器从VVR 8.0.5开始,判断Hologres实例为2.1及以上版本,也不再自动创建默认的slot。

-- 专家权限模型下为用户授予CREATE权限,以及赋予用户实例的Replication Role权限
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;

-- 如果Database开启了简单权限模型(SLMP),无法执行GRANT语句,使用spm_grant为用户授予DB的Admin权限,也可以在Holoweb中直接赋权
call spm_grant('{dbname}_admin', '云账号id/云邮箱/RAM账号');
alter role <user_name> replication;
说明

目前删除表并重建同名表可能导致作业出现"no table is defined in publication"或者"The table xxx has no slot named xxx"异常,原因是表被删除时,和表绑定的publication没有被删除。当发生此异常时,可以在hologres中执行select * from pg_publication where pubname not in (select pubname from pg_publication_tables);语句,查询删表时未一起被清理的publication,并执行drop publication xx;语句删除残留的publication,之后重新启动作业即可。或者选择VVR 8.0.5版本,连接器会自动执行清理操作。

消费分区表Binlog(公测)

分区表是数仓使用过程中一种常用的建表方式,有利于对数据进行归档和整理,提高查询的效率。在实时数仓分层等场景,消费Hologres Binlog作为数据源表,可以提升数据复用能力,缩短端到端数据加工耗时。Hologres Connector支持JDBC模式消费分区表Binlog,可以通过一个作业持续监听分区表的数据更改,而不需要启动多个作业。结合Hologres的动态分区能力,还可以动态地监听新增的分区,得到与消费非分区表一致的体验。

注意事项

  • 仅实时计算引擎VVR 8.0.11及以上版本,Binlog源表JDBC模式支持消费分区表。

  • 分区名称必须严格由父表名+下划线+分区值组成,即{parent_table}_{partition_value},非此格式的分区可能无法消费到。对于DYNAMIC模式,分区值格式与动态分区的时间单位有关,目前不支持带-分隔符的分区字段,详情请参见表名生成规则。如分区时间单位为DAY,则表的时间后缀格式必须采用YYYYMMDD,例如20241225。

  • Flink中声明Hologres源表时,必须包含Hologres分区表的分区字段。

  • 对于DYNAMIC模式,要求分区表必须开启动态分区管理。并且分区预创建参数auto_partitioning.num_precreate必须大于1,否则,在尝试消费最新分区时,作业将会抛出异常。

  • JDBC模式消费Binlog存在连接数的限制,消费分区表需要使用jdbc_fixed模式,要求Hologres实例版本大于等于2.1.27。

WITH参数

消费分区表WITH参数,其他参数请参考实时数仓Hologres

参数

说明

数据类型

是否必填

默认值

备注

参数

说明

数据类型

是否必填

默认值

备注

partition-binlog.mode

消费分区表Binlog模式。

Enum

DISABLE

参数取值如下:

  • DISABLE(默认值):源表是非分区表,如果指定的Hologres表为分区表,将抛出异常。

  • DYNAMIC:持续消费分区表的最新分区。分区表必须开启动态分区管理,DYNAMIC模式会按照从旧到新的顺序消费各个分区。当消费到次新分区时,会在新的单位时间到来时,开启最新分区的消费。

  • STATIC:消费分区表的固定分区,可同时消费多个分区。分区在消费过程中无法新增或移除。默认消费此父表的所有分区。

partition-binlog-lateness-timeout-minutes

DYNAMIC模式下消费分区表,允许延迟的最大超时时间。

Boolean

60

  • 单位为分钟,DYNAMIC模式会在新的单位时间到来时开启当前时间对应的最新分区的消费,但不会立刻关闭前一个分区,而是会持续监听以保证可以读取到上一个分区的延迟数据。

    例如:如果动态分区以DAY为单位,分区是20240920,允许数据最大延迟是1小时,对于这个分区,其消费会在2024-09-21 01:00:00关闭,而不是在2024-09-21 00:00:00停止消费。

  • lateness-timeout时间不允许超过分区的单位时间。

    如果按天分区,其最大值为24 * 60 = 1440(min),DYNAMIC模式大多数时间只会同时消费一张表,在延迟时间内可能同时消费两个分区。

partition-values-to-read

STATIC模式下消费分区表,指定所需消费的分区,分区值之间使用','进行分隔。

String

  • 不配置此参数时,STATIC模式会消费指定父表的所有分区,指定时则仅消费被指定的分区。

  • 此参数仅需要填写分区值,不需要完整的分区名称,多个分区值使用,分割。目前不支持通过正则表达式配置。

消费分区表时,启动模式binlogStartupMode和从Checkpoint恢复会有如下表现:

  • 消费分区表Binglog模式为DYNAMIC。

    启动模式

    说明

    启动模式

    说明

    binlogStartupMode=earliestOffset(默认值)

    从目前存在的最早分区的最早Binlog开始消费。

    binlogStartupMode=timestamp

    根据设置的startTime找到对应的分区,从这张分区的startTime开始消费。如指定起始时间为2024-09-10 10:00:00,则分区为20240910,此分区从2024-09-10 10:00:00开始消费,之后正常消费20240911等分区。

    binlogStartupMode=initial

    先全量消费数据,记录每个分区每个Shard消费的最大Binlog序号,增量消费会选择最新的两个分区,尝试从这两个分区每个Shard对应的LSN开始增量消费。

    Checkpoint恢复

    保存Checkpoint时,会记录每个Shard最新两个分区的最大Binlog序号,恢复时从记录的最大Binlog序号开始增量Binlog消费。

  • 消费分区表Binglog模式为STATIC。

    启动模式

    说明

    启动模式

    说明

    binlogStartupMode=earliestOffset(默认值)

    所有分区(或指定分区)从最早的Binlog开始消费。

    binlogStartupMode=timestamp

    所有分区(或指定分区)从设置的startTime开始消费Binlog。

    binlogStartupMode=initial

    先全量消费数据,记录每个分区每个Shard消费的最大Binlog序号,再从此最大Binlog序号开始增量Binlog消费。

    checkpoint恢复

    保存checkpoint时,会记录每个分区每个shard消费的最大Binlog序号,恢复时从此最大Binlog序号开始增量Binlog消费。

使用示例

DYNAMIC模式
STATIC模式

假设Hologres存在如下的DDL分区表,并且已启用Binlog以及动态分区。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 开启动态分区
    auto_partitioning_time_unit = 'DAY',  -- 以天为时间单元,自动创建的分区名示例:test_message_src1_20241027,test_message_src1_20241028
    auto_partitioning_num_precreate = '2' -- 会提前创建两个分区
);

-- 已经存在的分区表,也可以通过ALTER TABLE方式开启动态分区

Flink中,使用以下SQL声明对分区表test_message_src1进行DYNAMIC模式消费。当新的单位时间到达时,将自动启动对新分区的读取。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- hologres分区表的分区字段
)
with (
   'connector' = 'hologres'
  ,'dbname' = '<yourDatabase>'
  ,'tablename' = 'test_message_src1'  -- 开启了动态分区的父表
  ,'username' = '<yourUserName>'
  ,'password' = '<yourPassword>'
  ,'endpoint' = '<yourEndpoint>'
  ,'binlog' = 'true'
  ,'partition-binlog.mode' = 'DYNAMIC' -- 动态监听最新的分区
  ,'binlogstartUpMode' = 'initial'  -- 全增量
  ,'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免连接数限制
);

假设Hologres存在如下的DDL分区表,并且已启用Binlog。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Flink中,使用以下SQL声明对分区表test_message_src2进行STATIC模式消费。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- hologres分区表的分区字段
)
with (
   'connector' = 'hologres'
  ,'dbname' = '<yourDatabase>'
  ,'tablename' = 'test_message_src2'  -- 分区表
  ,'username' = '<yourUserName>'
  ,'password' = '<yourPassword>'
  ,'endpoint' = '<yourEndpoint>'
  ,'binlog' = 'true'
  ,'partition-binlog.mode' = 'STATIC' -- 消费固定的分区
  ,'partition-values-to-read' = 'red,blue,green'  -- 仅消费配置的3个分区,不会消费'black'分区;以后新增分区也不会消费
  ,'binlogstartUpMode' = 'initial'  -- 全增量
  ,'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免连接数限制
);

Hologres Binlog字段组成

一条Binlog的字段由Binlog系统字段和用户Table字段组成,字段定义如下:

字段名

字段类型

说明

字段名

字段类型

说明

hg_binlog_lsn

BIGINT

Binlog系统字段,表示Binlog序号,Shard内部单调递增但不连续,不同Shard之间不保证唯一和有序。

hg_binlog_event_type

BIGINT

Binlog系统字段,表示当前记录所表示的修改类型,参数取值如下:

  • INSERT=5:表示当前Binlog为插入一条新的记录。

  • DELETE=2:表示当前Binlog为删除一条已有的记录。

  • BEFORE_UPDATE=3:表示当前Binlog为更新操作前的记录。

  • AFTER_UPDATE=7:表示当前Binlog为更新操作后的记录。

hg_binlog_timestamp_us

BIGINT

Binlog系统字段,系统时间戳,单位为微秒。

user_table_column_1

用户定义

用户的表字段。

...

...

用户的表字段。

user_table_column_n

用户定义

用户的表字段。

元数据列

实时计算引擎VVR 8.0.11及以上版本的Binlog源表支持元数据列。从此版本起,建议以元数据列的方式声明hg_binlog_event_typeBinlog字段。元数据列SQL标准的扩展,通过元数据列可以访问源表的库名和表名,以及数据的变更类型,产生时间等特定信息,您可以基于这些信息自定义处理逻辑,例如过滤变更类型为DELETE的数据等。

字段名

字段类型

说明

字段名

字段类型

说明

db_name

STRING NOT NULL

包含该行记录的库名。

table_name

STRING NOT NULL

包含该行记录的表名。

hg_binlog_lsn

BIGINT NOT NULL

该行记录的Binlog序号,详情请参见Hologres Binlog字段组成

hg_binlog_timestamp_us

BIGINT NOT NULL

该行记录在数据库中的变更时间戳,单位微秒(us)。

hg_binlog_event_type

BIGINT NOT NULL

该行记录的变更类型。参数取值如下:

  • 5:表示INSERT消息。

  • 2:表示DELETE消息。

  • 3:表示UPDATE_BEFORE消息。

  • 7:表示UPDATE_AFTER消息。

hg_shard_id

INT NOT NULL

数据所在数据分片Shard。 Shard基本概念详情请参见Table GroupShard

DDL中,采用<meta_column_name> <datatype> METADATA VIRTUAL声明元数据列。示例如下:

CREATE TABLE test_message_src_binlog_table(
  -- hg_binlog_lsn BIGINT,
  -- hg_binlog_event_type BIGINT,
  -- hg_binlog_timestamp_us BIGINT,
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
  );
  • 本页导读 (1)
  • 使用限制
  • 注意事项
  • 开启Binlog
  • 消费模式
  • 全增量一体源表消费
  • 适用场景
  • 代码示例
  • JDBC模式Binlog源表
  • 消费分区表Binlog(公测)
  • 注意事项
  • WITH参数
  • 使用示例
  • Hologres Binlog字段组成
  • 元数据列
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等