本文为您介绍Hologres源表DDL定义、WITH参数、代码示例和类型映射。

说明 Hologres Connector可以作为Stream作业和Batch作业的源表使用。

什么是Hologres

Hologres兼容PostgreSQL协议,与大数据生态紧密连接,支持高并发、低延时实时分析处理PB级数据,让您轻松使用现有BI(Business Intelligence)工具对数据进行多维分析和业务探索。

前提条件

已创建Hologres表,详情请参见

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持Hologres Connector。
  • Hologres Connector不支持访问Hologres外部表。Hologres外部表详情请参见外部表
  • Flink默认以批模式读取Hologres源表数据,即只扫描一次Hologres全表,扫描结束,消费结束,新到Hologres源表的数据不会被读取。从VVR 3.0.0版本开始,支持实时消费Hologres数据。如何实时消费Hologres数据,请参见Flink实时消费Hologres
  • Hologres CDC模式暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式。详情请参见不支持定义Watermark,那如何进行窗口聚合?

DDL定义

create table hologres_source(
  name varchar,
  age BIGINT,
  birthday BIGINT
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' --该参数可选。
);
说明 Flink不支持在源表中定义计算列。

WITH参数

参数 说明 是否必填 备注
connector 源表类型。 固定值为hologres
dbname 数据库名称。 无。
tablename 表名称。
说明 如果Schema不为Public时,需要在表名称前面添加模式名称的前缀,即schema.tableName
无。
username 用户名,请填写阿里云账号的AccessKey ID。 无。
password 密码,请填写阿里云账号的AccessKey Secret。 无。
endpoint Hologres服务地址。 详情请参见访问域名
field_delimiter 导出数据时,不同行之间使用的分隔符。
说明 只有在bulkread = true时有效。
默认值为"\u0002"
binlog 是否消费Binlog数据。 参数取值如下:
  • true:消费Binlog数据。
  • false(默认值):不消费Binlog数据。
注意 仅VVP 2.4.0及以上版本支持该参数。
binlogMaxRetryTimes 读取Binlog数据出错后的重试次数。 默认值为60。
注意 仅VVP 2.4.0及以上版本支持该参数。
binlogRetryIntervalMs 读取Binlog数据出错后的重试时间间隔。 默认值为2000,单位为毫秒。
注意 仅VVP 2.4.0及以上版本支持该参数。
binlogBatchReadSize 批量读取Binlog的数据行数。 默认值为16。
注意 仅VVP 2.4.0及以上版本支持该参数。
cdcMode 是否采用CDC模式读取Binlog数据。 参数取值如下:
  • true:CDC模式读取Binlog数据。
  • false(默认值):非CDC模式读取Binlog数据。
注意 仅VVP 2.4.0及以上版本支持该参数。
binlogStartupMode Binlog数据消费模式。 参数取值如下:
  • initial:先全量消费数据,再读取Binlog开始增量消费。
  • earliestOffset(默认值): 从最早的Binlog开始消费。
  • timestamp: 从设置的startTime开始消费Binlog。
说明 仅实时计算引擎VVR 4.0.13及以上版本,Hologres 0.10及以上版本支持该参数。
startTime 启动位点的时间。如果没有设置该参数,且作业没有从state恢复,则从最早的Binlog开始消费Hologres数据。 格式为yyyy-MM-dd hh:mm:ss

代码示例

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' --该参数可选。
);

CREATE TEMPORARY TABLE blackhole_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT 
) with (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink
SELECT 
   name, age, birthday
from hologres_source;

类型映射

Hologres和Flink字段类型对应关系如下,建议使用该对应关系进行DDL声明。
Hologres字段类型 Flink字段类型
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<REAL>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE PRECISION>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<TEXT>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE(别名TIMESTAMPTZ) TIMESTAMP

Flink实时消费Hologres

VVP 2.4.0及以上版本,Hologres Connector支持实时消费Hologres。实时消费Hologres,即实时消费Hologres的Binlog数据。以下为您介绍Flink实时消费Hologres的详情:
  • 使用限制
    • Hologres V1.1及以上版本,可以根据业务需要选择开启或关闭Binlog能力,同时支持配置TTL满足不同业务场景对Binlog保留时间的诉求,详情请参见订阅Hologres Binlog
    • 已存在的表无法修改表属性开启Binlog,需要重新建表。
    • 不支持开启分区表父表的Binlog,请使用非分区表。
    • 暂不支持实时消费TIMESTAMP类型的数据,因此创建Hologres表时,请使用TIMESTAMPTZ类型。
    • 开启Binlog后,理论上列存表的开销要大于行存表的开销。因此,对于数据更新频繁的场景,建议使用存储格式为行存的表开启Binlog。
    • 目前,Binlog消费不支持数组类型,仅支持INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIMESTAMPTZ类型。
      说明 对不支持的数据类型(例如SMALLINT),即使不消费此字段,仍然可能导致作业无法上线。
  • 开启Binlog
    实时消费功能默认关闭,因此在Hologres控制台上创建表的DDL时,需要设置binlog.levelbinlog.ttl参数,示例如下。
    begin;
    create table test_message_src(
     id int primary key, 
     title text not null, 
     body text
    );
    call set_table_property('test_message_src', 'orientation', 'row');
    call set_table_property('test_message_src', 'clustering_key', 'id');
    call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表后开启Binlog。
    call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
    commit;
    其中,binlog.level设置为replica即代表开启Binlog,binlog.ttl为Binlog的TTL,单位为秒。
  • 注意事项
    • UPDATE操作会产生两条Binlog记录,分别为更新操作前和操作后的数据记录,因此您会消费到两条数据。但是,Hologres Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。
    • 建议Flink作业并发数和Hologres Table的Shard个数保持一致。
      您可以在Hologres控制台上,使用以下语句查看Table的Shard数,其中tablename为您的业务表名称。
      select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • 消费模式
    • 非CDC模式
      该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是INSERT类型的数据,您可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。源表DDL代码示例如下。
      create table test_message_src_binlog_table(
        hg_binlog_lsn BIGINT,
        hg_binlog_event_type BIGINT,
        hg_binlog_timestamp_us BIGINT,
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        'connector'='hologres',
        'dbname'='<yourDbname>',
        'tablename'='<yourTablename>',
        'username'='<yourAccessID>',
        'password'='<yourAccessSecret>',
        'endpoint'='<yourEndpoint>',
        'binlog' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '100'
      );
    • CDC模式
      该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER类型,这样就能完成表的数据的镜像同步,类似MySQL或Postgres的CDC功能。源表DDL代码示例如下。
      create table test_message_src_binlog_table(
        id INTEGER,
        title VARCHAR,
        body VARCHAR
      ) with (
        'connector'='hologres',
        'dbname'='<yourDbname>',
        'tablename'='<yourTablename>',
        'username'='<yourAccessID>',
        'password'='<yourAccessSecret>',
        'endpoint'='<yourEndpoint>',
        'binlog' = 'true',
        'cdcMode' = 'true',
        'binlogMaxRetryTimes' = '10',
        'binlogRetryIntervalMs' = '500',
        'binlogBatchReadSize' = '100'
      );
    • 全增量一体源表消费
      • 背景信息
        在源表Join维表时,由于Binlog的TTL等原因,会导致无法使用源表的所有数据。因为历史数据没有开Binlog,只有部分较新的数据开了Binlog。为了解决这个问题,之前的解决方案是为Binlog表设置一个很大的TTL,但这样会有以下问题:
        • 历史Binlog数据会被长时间保存,导致占用较多的存储资源。
        • 因为Binlog包含数据更新记录,使用Binlog进行全量消费会消费一些不必要的数据,导致占用较多的计算资源,且无法让用户只关注最新的数据。

        从VVR 4.0.13及以上版本,Hologres 0.10版本及以上开始,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据。采用这种方式,可以解决上述问题。

      • 适用场景

        适用于历史数据不包含Binlog,但又希望消费所有数据的场景。

      • 使用限制
        • 仅适用于目标表有主键的场景,否则推荐您使用CDC模式的Hologres源表。
        • Hologres在1.1版本之后,支持按需开启Binlog,可以将已有历史数据的表打开Binlog。
      • 代码示例
        CREATE TABLE test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) WITH (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true',
          'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
          );
  • 实现原理
    一条Binlog的字段由Binlog系统字段和用户Table字段组成,字段定义如下:
    字段名 字段类型 说明
    hg_binlog_lsn BIGINT Binlog系统字段,表示Binlog序号,Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。
    hg_binlog_event_type BIGINT Binlog系统字段,表示当前记录所表示的修改类型,参数取值如下:
    • INSERT=5:表示当前Binlog为插入一条新的记录。
    • DELETE=2:表示当前Binlog为删除一条已有的记录。
    • BEFORE_UPDATE=3:表示当前Binlog为更新操作前的记录。
    • AFTER_UPDATE=7:表示当前Binlog为更新操作后的记录。
    hg_binlog_timestamp_us BIGINT Binlog系统字段,系统时间戳,单位为μs。
    user_table_column_1 用户定义 用户的表字段。
    ... ... 用户的表字段。
    user_table_column_n 用户定义 用户的表字段。