Flink/Blink实时消费Hologres Binlog

本文将会为您介绍如何通过Flink和Blink实时消费Hologres Binlog。

注意事项

消费Hologres Binlog需要注意如下事项:

  • 仅Hologres V0.9及以上版本支持消费Hologres Binlog;仅Hologres V1.3.21及以上版本支持配置引擎白名单,HologresV1.3.21以下版本当前暂不支持配置引擎白名单,开启白名单后,会造成Binlog消费失败。如果您的实例版本低于所要求实例版本,请您加入Hologres钉钉群进行反馈,详情可参见在线支持

  • Hologres支持单表级别的Binlog功能,支持行存表和列存表,以及从Hologres V1.1版本开始支持行列共存表。开启Binlog后,理论上列存表的开销要大于行存表的开销。因此对于数据更新频繁的场景,建议为使用行存存储格式的表开启Binlog。

  • Hologres Binlog的支持情况以及开启、配置Hologres Binlog,请参见订阅Hologres Binlog

  • 仅阿里云Flink支持消费Hologres Binlog。Holohub模式下Flink消费Hologres Binlog只支持简单数据类型,从Flink 6.0.3版本开始,支持通过JDBC模式消费Hologres Binlog,相比Holohub,JDBC支持更多的数据类型,详情请参见Blink/Flink与Hologres的数据类型映射。同时增加了部分权限限制,详情请参见权限说明

  • 目前不支持消费分区父表的Binlog。

  • Hologres V2.0版本起有限支持Holohub模式;V2.1版本起下线Holohub模式,全面转为JDBC模式。在您升级Hologres版本前,请参考Holohub模式切换到JDBC模式,查看您当前正在使用Holohub模式的Flink任务,并按步骤升级Flink VVR作业版本,然后升级Hologres实例。

权限说明

  • Flink通过JDBC模式消费Hologres Binlog支持使用Hologres自定义账号,通过Holohub模式不支持使用Hologres自定义账号。

  • Flink通过Holohub模式消费Hologres Binlog需要表的读写权限。

  • Flink通过JDBC模式消费Hologres Binlog需要如下前提条件,详情请参见通过JDBC消费Hologres Binlog

    1. 已创建hg_binlogExtension(Hologres V2.0版本起默认创建)。

    2. 用户为实例的Superuser或用户同时拥有目标表的Owner权限和实例的Replication Role权限。

Flink实时消费Binlog

VVP-2.4及以上版本支持Hologres Connector实时消费Binlog,使用方法如下。

源表DDL(非CDC模式)

该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。Hologres表开启Binlog后,在Flink中源表(非CDC模式)使用如下DDL可以实时消费Binlog。

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
  • 三个binlogxxx 参数表示Binlog系统字段,命名和类型是固定的不能修改。

  • 其他字段是跟用户字段一一对应,必须是全小写。

源表DDL(CDC模式)

该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型(INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。

说明

Hologres Binlog源表(CDC模式)暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?

Hologres表开启Binlog后,在Flink中源表(CDC模式)使用如下DDL可以实时消费Binlog。

create table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',//Hologres的DB名
  'tablename'='<yourTablename>',//Hologres的表名
  'username'='<yourAccessID>',//当前账号的access id
  'password'='<yourAccessSecret>',//当前账号的access key
  'endpoint'='<yourEndpoint>',//Hologres的vpc网络地址
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

全增量一体源表

VVR引擎1.13-vvr-4.0.13版本,Hologres实例0.10及以上版本开始,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据,详情请参见实时数仓Hologres

JDBC模式Binlog源表

从Flink 6.0.3版本开始,支持通过JDBC模式消费Hologres Binlog,JDBC模式相比Holohub支持更多的数据类型和支持自定义账号,详情使用请参见实时数仓Hologres

Holohub模式切换到JDBC模式

Hologres从V2.0版本起逐步下线Holohub模式。如果您需要升级Hologres版本,需要将Holohub模式的作业切换到JDBC模式。请参考如下方式进行。

Hologres实例升级为V2.1版本

您在升级Hologres实例版本到V2.1前,请选择如下两个方案之一,检查Flink任务与Hologres实例,以保障Flink任务正常运行。

  • (方案一)(推荐)将Flink VVR版本升级到8.0.7及以上版本,Flink会自动将Holohub模式切换为JDBC模式。

  • (方案二)将Flink VVR升级到6.0.7~8.0.5版本,在源表中添加参数'sdkMode'='jdbc'之后重新启动作业,同时需要授予用户如下权限选项中的其中之一,确认作业正常运行之后再对Hologres实例进行升级。

    • (选项一)实例的Superuser权限。

    • (选项二)目标表的Owner权限,CREATE DATABASE权限及实例的Replication Role权限。

  • (方案三)(不推荐)将Flink VVR版本升级至8.0.6,Flink会自动将Holohub模式切换为JDBC模式。但VVR 8.0.6版本存在已知缺陷,当维表字段过多时可能导致VVR上线超时,详情请参见Hologres Connector Release Note

  • (可选)如果您的Flink VVR作业数量较多,获取需要升级版本的作业和表信息请参见如下内容。

Hologres实例升级为V2.0版本

  • (方案一)(推荐)将Flink VVR版本升级到8.0.6及以上版本,Flink会自动将Holohub模式切换为JDBC模式,其中VVR 8.0.6版本存在已知缺陷,当维表字段过多时可能导致VVR作业上线超时,详情请参见Hologres Connector Release Note。建议选择VVR 8.0.7版本。

  • (方案二)将Flink VVR版本升级到8.0.4或8.0.5版本,并重启Flink作业,同时授予用户如下权限选项中的其中之一,确认作业正常运行之后再对Hologres实例进行升级。

    • (选项一)实例的Superuser权限。

    • (选项二)目标表的Owner权限,CREATE DATABASE权限,及实例的Replication Role权限。

  • (方案三)将Flink VVR版本升级到6.0.7到8.0.3版本,Flink会继续使用Holohub模式消费Binlog。

如果您的Flink VVR消费Hologres Binlog的作业过多,可以使用如下方式获取需要升级版本的作业和表信息。

说明

该工具仅支持获取如下作业信息:

  • 通过DDL方式进行表定义的SQL作业。

  • 通过Hints方式指定参数的Catalog作业。

不支持获取JAR作业信息,不支持获取没有Hints参数的Catalog表信息。

  1. 下载开源工具find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar,详情请参见find-incompatible-flink-jobs

  2. 使用本地命令行进入开源工具目录,然后运行如下命令,即可查看全部需要升级版本的作业和表信息。

    说明

    运行如下命令需要安装Java环境,使用JDK 8及以上版本。

    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc>
    
    # 使用示例
    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs 北京 https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog

    参数说明如下:

    参数

    说明

    region

    目标实时计算Flink版项目空间所在地域的中文简称,取值请参见region取值对应表

    url

    目标实时计算Flink版项目任意一个作业的连接地址。

    AccessKeyID

    能访问实时计算Flink版项目空间的账号AccessKey ID。

    AccessKeySecret

    能访问实时计算Flink版项目空间的账号AccessKey Secret。

    binlog/rpc

    需要检查的作业内容,取值如下:

    • binlog:表示检查整个项目中所有作业的Hologres Binlog源表。

    • rpc:表示检查整个项目中所有作业使用了rpc模式的维表或结果表。

    region取值对应表(单击展开)

    地域

    取值

    华北2(北京)

    北京

    华东2(上海)

    上海

    华东1(杭州)

    杭州

    华南1(深圳)

    深圳

    华北3(张家口)

    张家口

    中国(香港)

    香港

    新加坡

    新加坡

    德国(法兰克福)

    德国

    印度尼西亚(雅加达)

    印度尼西亚

    马来西亚(吉隆坡)

    马来西亚

    美国(硅谷)

    美国

    上海金融云

    上海金融云

  3. 示例返回结果如下。

    image