Flink全托管产品(Flink Serverless)是基于Apache Flink构建的全托管产品,为您提供全托管的实时计算服务。本文为您介绍在Flink全托管中如何使用Hologres维表。

使用限制

  1. 建议使用Hologres的行存表或者行列共存表,列存表对于点查场景性能开销较大。
    创建行存储表时必须设置主键,并且将主键配置为clustering key时性能较好,示例语句如下。
    begin;
    create table test(a int primary key, b text, c text, d float8, e int8);
    call set_table_property('test', 'orientation', 'row');
    call set_table_property('test', 'clustering_key', 'a');
    commit;
  2. 表的主键必须是Flink Join ON的字段,Flink Join ON的字段也必须是表的完整主键,两者必须完全匹配。

DDL定义

创建Hologres维表的DDL语句如下。
CREATE TABLE hologres_dim(
 id INT,
 len INT,
 content VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',  --Hologres的数据库名称。
  'tablename'='<yourTablename>',  --Hologres用于接收数据的表名称。
  'username'='<yourUsername>',  --当前阿里云账号的AccessKey ID。
  'password'='<yourPassword>',  --当前阿里云账号的AccessKey Secret。
  'endpoint'='<yourEndpoint>'  --当前Hologres实例VPC网络的Endpoint。
);
With参数的描述如下表所示。
参数 描述 是否必填
connector 维表类型。

固定值为hologres

dbname Hologres的数据库名称。
tablename Hologres用于接收数据的表名称。
username 当前阿里云账号的AccessKey ID。

您可以登录AccessKey 管理,获取AccessKey ID。

password 当前阿里云账号的AccessKey Secret。

您可以登录AccessKey 管理,获取AccessKey Secret。

endpoint Hologres的VPC网络地址。
您可以登录Hologres管控台,进入目标实例的详情页,在实例配置中获取Endpoint。Endpoint需包含端口号,格式为ip:port。
说明 如果Flink与Hologres实例部署在同一个地域,请使用VPC网络的网络地址。如果在不同地域,请使用公共网络的网络地址,并确保Flink集群能正常访问公网(公网网络延迟较高)。
useRpcMode Hologres Connector默认使用JDBC实现,可以通过该选项切换至老版本的Connector实现方式,即RPC模式。未来版本将不再支持RPC模式,因此不推荐使用该参数。

默认值为false

connectionSize 表示单个Flink维表Task所创建的JDBC连接池大小。

默认值为3,和吞吐成正比。

connectionPoolName 连接池名称,同一个TaskManager中,表配置同名的连接池名称可以共享连接池。

无默认值,每个表默认使用自己的连接池。如果设置连接池名称,则所有表的connectionSize需要相同,需要实时计算版本大于1.13-vvr-4.1.2。

async 表示是否采用异步方式同步数据。

异步模式可以并发地处理多个请求和响应,从而连续的请求之间不需要阻塞等待,提高查询的吞吐。但在异步模式下,无法保证请求的绝对顺序。

取值如下
  • true:表示异步同步数据。
  • false:默认值,表示不进行异步同步数据
说明 关异步请求的原理请参见维表 JOIN 与异步优化
jdbcReadBatchSize 表示维表点查最大批次大小。

默认值为128

jdbcReadBatchQueueSize 表示维表点查请求缓冲队列大小。

默认值为256

说明 jdbc开头的参数在Flink引擎1.13-vvr-4.0.11版本开始支持,其他JDBC参数请参见Hologres结果表

Cache参数

如果Hologres维表包含Cache参数,则可以参考如下参数描述。
参数 描述 是否必填
cache 缓存策略。
Hologres仅支持以下两种缓存策略:
  • None(默认值):无缓存。
  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。

    需要配置相关参数:缓存大小(cachesize)和缓存更新时间间隔(cachettlms)。

cachesize 缓存大小。

选择LRU缓存策略后,可以设置缓存大小,默认值为10000行。

cachettlms 更新缓存的时间间隔,单位为毫秒。

当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。

cacheempty 是否缓存join结果为空的数据。
取值如下:
  • true(默认值):表示缓存join结果为空的数据。
  • false:表示不缓存join结果为空的数据。

Hologres Catalog

Flink全托管支持Hologres Catalog,在Flink全托管控制台直接读取Hologres元数据,不用再手动注册Hologres表,可以提高作业开发的效率且能保证表结构的正确性,详情请参见管理Hologres Catalog

基于Hologres Catalog,目前全托管的Flink已经支持模型演进(schema evolution)以及整库同步能力,详情请参见CREATE TABLE AS(CTAS)语句CREATE DATABASE AS(CDAS)语句

使用示例

创建Hologres维表并接收Flink的数据,示例语句如下。
CREATE TEMPORARY TABLE datagen_source (
   a INT,
   b BIGINT,
   c STRING,
   proctime AS PROCTIME()
) with (
   'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
   a INT, 
   b VARCHAR, 
   c VARCHAR
) with (
   'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
   a INT,
   b STRING
) with (
   'connector' = 'blackhole'
);

insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Hologres DataStream Connector

如果您通过DataStream的方式读写Hologres数据,则需要使用Hologres DataStream Connector连接Flink全托管,详情请参见Hologres DataStream Connector

类型映射

Flink全托管与Hologres的数据类型映射,请参见数据类型汇总