Flink/Blink实时消费Hologres Binlog

更新时间: 2023-07-13 16:02:11

本文将会为您介绍如何通过Flink和Blink实时消费Hologres Binlog。

注意事项

消费Hologres Binlog需要注意如下事项:

  • 仅Hologres V0.9及以上版本支持消费Hologres Binlog;仅Hologres V1.3.21及以上版本支持配置引擎白名单,HologresV1.3.21以下版本当前暂不支持配置引擎白名单,开启白名单后,会造成Binlog消费失败。如果您的实例版本低于所要求实例版本,请您加入Hologres钉钉群进行反馈,详情可参见在线支持

  • Hologres支持单表级别的Binlog功能,支持行存表和列存表,以及从Hologres V1.1版本开始支持行列共存表。开启Binlog后,理论上列存表的开销要大于行存表的开销。因此对于数据更新频繁的场景,建议为使用行存存储格式的表开启Binlog。

  • Hologres Binlog的支持情况以及开启、配置Hologres Binlog,请参见订阅Hologres Binlog

  • 仅阿里云Flink支持消费Hologres Binlog。Holohub模式下Flink消费Hologres Binlog只支持简单数据类型,从Flink 6.0.3版本开始,支持通过JDBC模式消费Hologres Binlog,相比Holohub,JDBC支持更多的数据类型,详情请参见Blink/Flink与Hologres的数据类型映射。同时增加了部分权限限制,详情请参见权限说明

  • Hologres V2.0版本起不支持Holohub模式,全面转为JDBC模式。从Flink 6.0.7版本开始,默认通过JDBC模式消费Hologres Binlog。如果您的Flink版本小于6.0.7,需要显式指定sdkMode参数值为jdbc或升级您的Flink版本。

权限说明

  • Flink通过JDBC模式消费Hologres Binlog支持使用Hologres自定义账号,通过Holohub模式不支持使用Hologres自定义账号。

  • Flink通过Holohub模式消费Hologres Binlog需要表的读写权限。

  • Flink通过JDBC模式消费Hologres Binlog需要如下前提条件,详情请参见通过JDBC消费Hologres Binlog

    1. 已创建hg_binlogExtension(Hologres V2.0版本起默认创建)。

    2. 用户为实例的Superuser或用户同时拥有目标表的Owner权限和实例的Replication Role权限。

Flink实时消费Binlog

VVP-2.4及以上版本支持Hologres Connector实时消费Binlog,使用方法如下。

源表DDL(非CDC模式)

该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。Hologres表开启Binlog后,在Flink中源表(非CDC模式)使用如下DDL可以实时消费Binlog。

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
  • 三个binlogxxx 参数表示Binlog系统字段,命名和类型是固定的不能修改。

  • 其他字段是跟用户字段一一对应,必须是全小写。

源表DDL(CDC模式)

该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。

说明

Hologres Binlog源表(CDC模式)暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见不支持定义Watermark,那如何进行窗口聚合?

Hologres表开启Binlog后,在Flink中源表(CDC模式)使用如下DDL可以实时消费Binlog。

create table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',//Hologres的DB名
  'tablename'='<yourTablename>',//Hologres的表名
  'username'='<yourAccessID>',//当前账号的access id
  'password'='<yourAccessSecret>',//当前账号的access key
  'endpoint'='<yourEndpoint>',//Hologres的vpc网络地址
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

全增量一体源表

VVR引擎1.13-vvr-4.0.13版本,Hologres实例0.10及以上版本开始,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据,详情请参见实时数仓Hologres

JDBC模式Binlog源表

从Flink 6.0.3版本开始,支持通过JDBC模式消费Hologres Binlog,JDBC模式相比Holohub支持更多的数据类型和支持自定义账号,详情使用请参见实时数仓Hologres

阿里云首页 实时数仓Hologres 相关技术圈