Flink全托管产品(Flink Serverless)是基于Apache Flink构建的全托管产品,为您提供全托管的实时计算服务。本文为您介绍Flink全托管如何实时写入数据至Hologres结果表。
DDL定义
create table hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
'connector'='hologres',
'dbname'='<yourDbname>', --Hologres的数据库名称。
'tablename'='<yourTablename>', --Hologres用于接收数据的表名称。
'username'='<yourUsername>', --当前阿里云账号的AccessKey ID。
'password'='<yourPassword>', --当前阿里云账号的AccessKey Secret。
'endpoint'='<yourEndpoint>', --当前Hologres实例VPC网络的Endpoint。
);
With参数的描述如下表所示。- jdbcWriteBatchSize和jdbcWriteFlushInterval之间为或的关系,即当同时设置了两个参数,满足其中一个,就进行写入。
- 仅实时计算引擎Flink 1.13-VVR 4.0.11及以上版本支持所有
jdbc
开头的参数。
参数 | 描述 | 是否必填 | 备注 |
---|---|---|---|
connector | 结果表类型。 | 是 |
固定值为hologres。 |
dbname | Hologres的数据库名称。 | 是 | 不涉及。 |
tablename | Hologres用于接收数据的表名称。如果Schema不为Public时,需要在表名称前面添加模式名称的前缀,即schemaName.tableName 。
|
是 | 不涉及。 |
username | 当前阿里云账号的AccessKey ID。 | 是 |
您可以登录AccessKey 管理,获取AccessKey ID。 |
password | 当前阿里云账号的AccessKey Secret。 | 是 |
您可以登录AccessKey 管理,获取AccessKey Secret。 |
endpoint | Hologres的VPC网络地址。
说明 如果Flink与Hologres实例部署在同一个地域,请使用VPC网络的网络地址。如果在不同地域,请使用公共网络的网络地址,并确保Flink集群能正常访问公网(公网网络延迟较高)。
|
是 |
您可以登录Hologres管控台,进入目标实例的详情页,在网络信息中获取Endpoint。Endpoint需包含端口号,格式为 |
sdkMode | SDK模式。
说明 从VVP 6.0.3版本开始,Hologres Connector的模式设置统一更名为
sdkMode ,若VVP版本低于6.0.3,请使用原参数useRpcMode 。
|
否 |
参数取值如下:
|
field_delimiter/arraydelimiter | Hologres Sink支持将一个STRING字段按照field_delimiter切分为数组导入Hologres。 | 否 |
默认值为"\u0002"。 |
mutatetype | 数据写入模式,详情请参见流式语义。 | 否 |
默认值为insertorignore。 |
ignoredelete | 是否忽略回撤消息。取值如下:
通常Flink的Group By会产生回撤消息,回撤消息传输到Hologres Connector时会产生Delete请求。 |
否 |
默认值为true。
说明 仅在使用流式语义时生效。
|
partitionrouter | 是否写入分区表。取值如下:
|
否 |
默认值为false。 |
createparttable | 当写入分区表时,是否根据分区值自动创建分区表。仅Flink全托管2.1.X及以上版本支持自动创建分区表。取值如下:
|
否 |
默认值为false。
重要 请您谨慎使用该功能,确保分区值不会出现脏数据,导致创建了错误的分区表。
|
ignoreNullWhenUpdate | 当mutatetype='insertOrUpdate' 时,是否忽略更新写入数据中的Null值。
|
否 |
默认值为false。 |
useRpcMode | Hologres Connector默认使用JDBC实现,可以通过该选项切换至老版本的Connector的RPC模式。
参数取值如下:
|
否 |
默认值为false。 |
connectionSize | 表示单个Flink结果表Task所创建的JDBC连接池大小。 | 否 |
默认值为3,和吞吐成正比。 关于连接数的使用和优化请参见连接数使用说明。 |
connectionPoolName | 连接池名称,同一个TaskManager中,表配置同名的连接池名称可以共享连接池。 | 否 |
无默认值,每个表默认使用自己的连接池。如果设置连接池名称,则所有表的connectionSize需要相同,需要实时计算版本大于1.13-vvr-4.1.12。 |
jdbcWriteBatchSize | 表示Hologres Sink节点数据攒批的最大批大小。 | 否 |
默认值为256。 |
jdbcWriteBatchByteSize | 表示Hologres Sink节点数据攒批的最大字节大小。
说明 jdbcWriteBatchSize和jdbcWriteFlushInterval之间为或的关系,即当同时设置了两个参数,满足其中一个,就进行写入。
|
否 |
默认值为20971520(2 * 1024 * 1024),即2MB。 |
jdbcWriteFlushInterval | 表示Hologres Sink节点数据攒批的最长Flush等待时间。
说明 jdbcWriteBatchSize和jdbcWriteFlushInterval之间为或的关系,即当同时设置了两个参数,满足其中一个,就进行写入。
|
否 |
默认值为10000,即10秒 |
jdbcEnableDefaultForNotNullColumn | Hologres表中not null且没有设置默认值的字段,是否允许写入null值。
参数取值如下:
|
否 | 默认值为true。 |
jdbcRetryCount | 表示当连接故障时,写入和查询的重试次数。 | 否 |
默认值为10。 |
jdbcRetrySleepInitMs | 表示每次重试的等待时间,公式为retrySleepInitMs + retry * retrySleepStepMs。 | 否 |
默认值为1000ms。 |
jdbcRetrySleepStepMs | 表示每次重试的等待时间,公式为retrySleepInitMs + retry * retrySleepStepMs。 | 否 |
默认值为5000ms。 |
jdbcConnectionMaxIdleMs | 表示写入线程和点查线程数据库连接的最大Idle时间,超过连接将被释放。 | 否 |
默认值为60000,即60秒。 |
jdbcMetaCacheTTL | 表示元信息(TableSchema)的本地缓存时间。 | 否 |
默认值为60000,即60秒。 |
jdbcMetaAutoRefreshFactor | 表示当元信息缓存剩余存活时间短于 metaCacheTTL、metaAutoRefreshFactor将自动刷新缓存。 | 否 |
默认值为-1,表示不自动刷新。 |
流式语义
流处理,也称为流数据或流事件处理,即对一系列无界数据或事件连续处理。执行流数据或流事件处理的系统通常允许您指定一种可靠性模式或处理语义,保证整个系统处理数据的准确性,因为网络或设备故障等可能会导致数据丢失。
- Exactly-once(仅一次):即使在发生各种故障的情况下,系统只处理一次数据或事件。
- At-least-once(至少一次):如果在系统完全处理之前丢失了数据或事件,则从源头重新传输,因此可以多次处理数据或事件。如果第一次重试成功,则不进行后续重试。
- 如果Hologres物理表未设置主键,则Hologres Sink使用At-least-once语义。
- 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保使用Exactly-once语义。当同主键数据出现多次时,您需要设置mutatetype参数确定更新结果表的方式,mutatetype取值如下:
- insertorignore(默认值):保留首次出现的数据,忽略后续所有数据。
- insertorreplace:整行替换已有数据。
- insertorupdate:替换部分已有数据。例如一张表有a、b、c和d四个字段,a是主键PK(Primary Key),写入Hologres时只写入a和b两个字段,在主键重复的情况下,系统只会更新b字段,c和d保持不变。
说明- 当mutatetype设置为insertorupdate或insertorreplace时,系统根据主键更新数据。
- Flink全托管定义的结果表中的数据列数不一定要和Hologres物理表的列数一致,您需要保证缺失的列没有非空约束,即列值可以为Null,否则会报错。
- 默认情况下,Hologres Sink只能向一张表导入数据。如果导入数据至分区表的父表,即使导入成功,也会查询数据失败。您可以设置参数partitionrouter为true,开启自动将数据路由到对应分区表的功能。注意事项如下:
- tablename参数需要填写为分区表父表的表名称。
- 如果没有提前创建分区表,需要设置createparttable参数为true,从而支持自动创建分区表,否则会导入失败。
如何使用宽表Merge/局部更新功能
- 使用方法
对于常见的多个流的数据写入至一张Hologres宽表的场景,具体使用方法如下:
假设Hologres有一张宽表WIDE_TABLE,有A、B、C、D、E几列,其中A字段是主键,Flink一个流包含数据A、B、C,另一个流包含数据A、D、E。- 使用Flink SQL声明两张Hologres结果表,其中一张表只声明字段A、B、C,另一张表只声明字段A、D、E,这两张表都映射至WIDE_TABLE。
- 两张结果表的mutatetype属性都设置成insertorupdate。
- 两张结果表的ignoredelete属性都设置成true,防止回撤消息产生Delete请求。
- 将两个流的数据分别Insert至两张结果表中。
- 使用限制
该场景的具体使用限制如下:
- 宽表必须有主键。
- 每个流的数据都必须包含完整主键字段。
- 列存表的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary encoding。
- 使用示例
Hologres建表语句如下,包含6个字段,其中字段
a
为主键。
Flink结果表相关DDL如下。BEGIN; CREATE TABLE wide_table( a BIGINT primary key, b TEXT, c BIGINT, d TEXT, e BIGINT, f FLOAT8 ); call set_table_property('wide_table', 'dictionary_encoding_columns', 'b:off'); COMMIT;
-- 根据主键a更新或插入b,c字段 CREATE TEMPORARY TABLE hologres_sink_1( a varchar primary key, b BIGINT, c BIGINT ) with ( 'connector'='hologres', 'dbname'='<yourDbname>', 'tablename'='wide_table', //Hologres接收数据的宽表表名 'username'='<yourUsername>', 'password'='<yourPassword>', 'endpoint'='<yourEndpoint>', 'ignoredelete'='true', // 忽略回撤 'mutatetype'='insertorupdate' //使用insertorupdate参数,更新部分列 ); -- 根据主键a更新或插入d,e字段 CREATE TEMPORARY TABLE hologres_sink_2( a varchar primary key, d BIGINT, e varchar ) with ( 'connector'='hologres', 'dbname'='<yourDbname>', 'tablename'='wide_table', //Hologres接收数据的宽表表名 'username'='<yourUsername>', 'password'='<yourPassword>', 'endpoint'='<yourEndpoint>', 'ignoredelete'='true', // 忽略回撤 'mutatetype'='insertorupdate' //使用insertorupdate参数,更新部分列 ); -- 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。 BEGIN STATEMENT SET; INSERT INTO hologres_sink_1 SELECT a,b,c FROM source_1; INSERT INTO hologres_sink_2 SELECT a,d,e FROM source_2; END;
Hologres Catalog
Flink全托管支持Hologres Catalog,在Flink全托管控制台直接读取Hologres元数据,不用再手动注册Hologres表,可以提高作业开发的效率且能保证表结构的正确性,详情请参见管理Hologres Catalog。
基于Hologres Catalog,目前全托管的Flink已经支持模型演进(schema evolution)以及整库同步能力,详情请参见CREATE TABLE AS(CTAS)语句和CREATE DATABASE AS(CDAS)语句。
Hologres DataStream Connector
如果您通过DataStream的方式读写Hologres数据,则需要使用Hologres DataStream Connector连接Flink全托管,详情请参见Hologres DataStream Connector。
类型映射
Flink全托管与Hologres的数据类型映射,请参见数据类型汇总。Connector Release Note
版本说明请参见Hologres Connector Release Note。