本文将会为您介绍如何通过Flink和Blink实时消费Hologres Binlog。

注意事项

消费Hologres Binlog需要注意如下事项:
  • 仅Hologres V0.9及以上版本支持消费Hologres Binlog,如果您的实例是V0.9以下版本,请您提交工单或加入在线支持钉钉群申请升级实例,加入在线支持钉钉群请参见如何获取更多的在线支持?
  • Hologres支持单表级别的Binlog功能,支持行存表和列存表,以及从Hologres V1.1版本开始支持行列混存表。
  • Hologres Binlog的支持情况以及开启、配置Hologres Binlog,请参见订阅Hologres Binlog

Flink实时消费Binlog

Flink VVP-2.4及以上版本,支持Hologres Connector实时消费Binlog,使用方法如下。

  1. 使用DDL语句创建源表
    • 使用语法
      • 源表 DDL(非CDC模式)
        该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据,具体示例如下。
        create table test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
      • 源表 DDL(CDC模式)
        该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能,具体示例如下。
        create table test_message_src_binlog_table(
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true'
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
    • 参数说明

      在语法示例中,hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。更多参数说明如下表所示:

      参数名称 是否必填 说明
      connector 源表类型,值填写为hologres。
      dbname 读取的Hologres DB名称。
      tablename 读取的表名称。
      username 当前阿里云账号的AccessKey ID。您可以单击AccessKey 管理,获取AccessKey ID。
      password 当前阿里云账号的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
      endpoint Hologres对应VPC的区域。您可以进入Hologres管理控制台,获取区域和端口信息。
      binlog 是否为Binlog source。如果需要消费,需要将binlog参数设置为true
      cdcmode 读取Binlog时是否采用CDC模式。如果是CDC模式,需要将cdcmode参数设置为true
      binlogMaxRetryTimes 读取Binlog出错重试次数,默认为60次。
      binlogRetryIntervalMs 读取Binlog出错重试间隔,默认为2000ms。
      binlogBatchReadSize 读取Binlog批量大小,默认为16个。
      startTime 启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。
  2. 配置Binlog并发
    Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。其中,<tablename>请替换为您实际的Table名称。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';

Blink实时消费Binlog

实时计算Blink 3.7及以上版本,支持Hologres Connector实时消费Binlog,开启方法如下。

  1. 使用DDL语句创建源表
    • 使用语法
      create table test_message_src_binlog_table(
        hg_binlog_lsn BIGINT,
        hg_binlog_event_type BIGINT,
        hg_binlog_timestamp_us BIGINT,
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        type = 'hologres',
        'endpoint' = 'ip:port',--Hologres的vpc网络地址
        'username' = 'xxxx',--当前账号的AccessKey ID
        'password' = 'xxxx',--当前账号的AccessKey Secret
        'dbname' = 'xxxx',--Hologres的DB名
        'tablename' = 'xxxx',--Hologres的表名
        'binlog' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '256'
      );
    • 参数说明

      在语法示例中,hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。更多参数说明如下表所示:

      参数名称 是否必填 说明
      type 源表类型,值填写为hologres。
      dbname 读取的Hologres DB名称。
      tablename 读取的表名称。
      username 当前阿里云账号的AccessKey ID。您可以单击AccessKey 管理,获取AccessKey ID。
      password 当前阿里云账号的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
      endpoint Hologres对应VPC的区域。您可以进入Hologres管理控制台,获取区域和端口信息。
      binlog 是否为Binlog source。如果需要消费,需要将binlog source设置为true
      binlogMaxRetryTimes 读取Binlog出错重试次数,默认为60次。
      binlogRetryIntervalMs 读取Binlog出错重试间隔,默认为2000ms。
      binlogBatchReadSize 读取Binlog批量大小,默认为16个。
      startTime 启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。
  2. 配置Binlog并发
    Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。其中,<tablename>请替换为您实际的Table名称。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';