Flink全托管产品(Flink Serverless)是基于Apache Flink构建的全托管产品,为您提供全托管的实时计算服务。本文为您介绍在Flink全托管中如何使用Hologres维表。
使用限制
- 建议使用Hologres的行存表,列存表对于点查场景性能开销较大。
创建行存储表时必须设置主键,并且将主键配置为clustering key时才可以工作,示例语句如下。
begin; create table test(a int primary key, b text, c text, d float8, e int8); call set_table_property('test', 'orientation', 'row'); call set_table_property('test', 'clustering_key', 'a'); commit;
- 表的主键必须是Flink Join ON的字段,Flink Join ON的字段也必须是表的完整主键,两者必须完全匹配。
- Hologres Blink Connector的维表功能不支持一对多的输出。
DDL定义
创建Hologres维表的DDL语句如下。
CREATE TABLE hologres_dim(
id INT,
len INT,
content VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>', --Hologres的数据库名称。
'tablename'='<yourTablename>', --Hologres用于接收数据的表名称。
'username'='<yourUsername>', --当前阿里云账号的AccessKey ID。
'password'='<yourPassword>', --当前阿里云账号的AccessKey Secret。
'endpoint'='<yourEndpoint>' --当前Hologres实例VPC网络的Endpoint。
);
With参数的描述如下表所示。参数 | 描述 | 是否必填 |
---|---|---|
connector | 维表类型。
固定值为hologres。 |
是 |
dbname | Hologres的数据库名称。 | 是 |
tablename | Hologres用于接收数据的表名称。 | 是 |
username | 当前阿里云账号的AccessKey ID。
您可以登录AccessKey 管理,获取AccessKey ID。 |
是 |
password | 当前阿里云账号的AccessKey Secret。
您可以登录AccessKey 管理,获取AccessKey Secret。 |
否 |
endpoint | Hologres的VPC网络地址。
您可以登录Hologres管控台,进入目标实例的详情页,在实例配置中获取Endpoint。Endpoint需包含端口号,格式为ip:port。
说明 如果Flink与Hologres实例部署在同一个地域,请使用VPC网络的网络地址。如果在不同地域,则请使用公共网络的网络地址。
|
是 |
Cache参数
如果Hologres维表包含Cache参数,则可以参考如下参数描述。
参数 | 描述 | 是否必填 |
---|---|---|
cache | 缓存策略
Hologres仅支持以下两种缓存策略:
|
否 |
cachesize | 缓存大小
选择 |
否 |
cachettlms | 更新缓存的时间间隔。
当选择 |
否 |
async | 是否异步同步数据。
取值如下:
|
否 |
cacheempty | 是否缓存join结果为空的数据。
取值如下:
|
否 |
使用示例
创建Hologres维表并接收Flink的数据,示例语句如下。
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) with (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) with (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) with (
'connector' = 'blackhole'
);
insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
在文档使用中是否遇到以下问题
更多建议
匿名提交