本文将会为您介绍如何通过Flink和Blink实时消费Hologres Binlog。

注意事项

消费Hologres Binlog需要注意如下事项:
  • 仅Hologres V0.9及以上版本支持消费Hologres Binlog,如果您的实例是V0.9以下版本,请您提交工单或加入在线支持钉钉群申请升级实例,加入在线支持钉钉群请参见如何获取更多的在线支持?
  • Hologres支持单表级别的Binlog功能,支持行存表和列存表,以及从Hologres V1.1版本开始支持行列混存表。
  • Hologres Binlog的支持情况以及开启、配置Hologres Binlog,请参见订阅Hologres Binlog
  • 当前版本暂不支持配置引擎白名单,开启白名单后,会造成Binlog消费失败。
  • Binlog消费目前不支持数组类型,只支持以下数据类型:INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIMESTAMPTZ。对不支持的数据类型(如SMALLINT)即使不消费此字段,仍然可能导致作业无法上线。

Flink实时消费Binlog

Flink支持Hologres Connector实时消费Binlog,使用方法如下。

  1. 使用DDL语句创建源表
    • 使用语法
      • 源表 DDL(非CDC模式)
        该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据,具体示例如下。
        create table test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
      • 源表 DDL(CDC模式)
        该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。
        说明 Hologres Binlog源表(CDC模式)暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见不支持定义Watermark,那如何进行窗口聚合?
        Hologres表开启Binlog后,在Flink中源表(CDC模式)使用如下DDL可以实时消费Binlog。
        create table test_message_src_binlog_table(
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
      • 全增量一体源表

        VVR引擎1.13-vvr-4.0.13版本,Hologres实例0.10及以上版本开始,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据,详情请参见Hologres源表

    • 参数说明

      在语法示例中,hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。更多参数说明如下表所示:

      参数名称 是否必填 说明
      connector 源表类型,值填写为hologres。
      dbname 读取的Hologres DB名称。
      tablename 读取的表名称。
      username 当前阿里云账号的AccessKey ID。您可以单击AccessKey 管理,获取AccessKey ID。
      password 当前阿里云账号的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
      endpoint Hologres对应VPC的区域。您可以进入Hologres管理控制台,获取区域和端口信息。
      binlog 是否为Binlog source。如果需要消费,需要将binlog参数设置为true
      cdcmode 读取Binlog时是否采用CDC模式。如果是CDC模式,需要将cdcmode参数设置为true
      binlogMaxRetryTimes 读取Binlog出错重试次数,默认为60次。
      binlogRetryIntervalMs 读取Binlog出错重试间隔,默认为2000ms。
      binlogBatchReadSize 读取Binlog批量大小,默认为16个。
      startTime 启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。
      说明 只要设置了此参数,binlogStartupMode都会按照timestamp生效。
      binlogStartupMode Binlog消费启动模式,推荐与CDC模式一同使用,取值如下。
      • timestamp:表示从设置的startTime开始消费Binlog。
      • initial:表示先全量消费数据,再读取Binlog开始增量消费。
      • earliestOffset(默认值):表示从最早的Binlog开始消费。
  2. 配置Binlog并发
    Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。其中<tablename>请替换为您实际的Table名称。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  3. 消费延迟监控
    VVP-4.0.12版本起,支持通过监控指标EventTimeLag监控Flink读取Hologres Binlog的延迟状态,0表示无延迟。

Blink实时消费Binlog

实时计算Blink 3.7及以上版本,支持Hologres Connector实时消费Binlog,开启方法如下。

  1. 使用DDL语句创建源表
    • 使用语法
      create table test_message_src_binlog_table(
        hg_binlog_lsn BIGINT,
        hg_binlog_event_type BIGINT,
        hg_binlog_timestamp_us BIGINT,
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        type = 'hologres',
        'endpoint' = 'ip:port',--Hologres的vpc网络地址
        'username' = 'xxxx',--当前账号的AccessKey ID
        'password' = 'xxxx',--当前账号的AccessKey Secret
        'dbname' = 'xxxx',--Hologres的DB名
        'tablename' = 'xxxx',--Hologres的表名
        'binlog' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '256'
      );
    • 参数说明

      在语法示例中,hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。更多参数说明如下表所示:

      参数名称 是否必填 说明
      type 源表类型,值填写为hologres。
      dbname 读取的Hologres DB名称。
      tablename 读取的表名称。
      username 当前阿里云账号的AccessKey ID。您可以单击AccessKey 管理,获取AccessKey ID。
      password 当前阿里云账号的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
      endpoint Hologres对应VPC的区域。您可以进入Hologres管理控制台,获取区域和端口信息。
      binlog 是否为Binlog source。如果需要消费,需要将binlog source设置为true
      binlogMaxRetryTimes 读取Binlog出错重试次数,默认为60次。
      binlogRetryIntervalMs 读取Binlog出错重试间隔,默认为2000ms。
      binlogBatchReadSize 读取Binlog批量大小,默认为16个。
      startTime 启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。
  2. 配置Binlog并发
    Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。其中<tablename>请替换为您实际的Table名称。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';