本文为您介绍开源Flink 1.11如何实时写入数据至Hologres。
前提条件
背景信息
从开源Flink1.11版本开始,Hologres代码已开源,相应版本的Connector已经在中央仓库发布Release包,可在项目中参照如下pom文件进行配置。详细内容请参见Hologres GitHub官方库。
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.15</artifactId>
<version>1.4.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
开源Flink版本与hologres-connector-flink
最新版本对应关系如下,建议使用1.15及以上版本,功能更丰富:
Flink版本 | Connector版本 |
Flink 1.11 | hologres-connector-flink-1.11:1.0.1 |
Flink 1.12 | hologres-connector-flink-1.12:1.0.1 |
Flink 1.13 | hologres-connector-flink-1.13:1.3.2 |
Flink 1.14 | hologres-connector-flink-1.14:1.3.2 |
Flink 1.15 | hologres-connector-flink-1.15:1.4.1 |
Flink 1.17 | hologres-connector-flink-1.17:1.4.1 |
Flink SQL写入数据至Hologres代码示例
您可以参照如下代码示例,通过将Flink SQL将数据写入Hologres。其中,更多详细的代码示例请参见Hologres GitHub官方库。
String createHologresTable =
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, tableName, userName, password, endPoint);
tEnv.executeSql(createHologresTable);
createScanTable(tEnv);
tEnv.executeSql("insert into sink select * from source");
更多详尽的代码示例请参见hologres-connector-flink-examples,包括如下示例。
FlinkSQLToHoloExample:一个使用纯Flink SQL接口实现的应用,将数据写入至Hologres。
FlinkDSAndSQLToHoloExample:一个混合Flink DataStream以及SQL 接口实现的应用,写入Hologres前,将DataStream转换成Table,之后再用Flink SQL写入Hologres。
FlinkDataStreamToHoloExample:一个使用纯Flink DataStream接口实现的应用,将数据写入至Hologres。
FlinkRoaringBitmapAggJob:一个使用Flink及RoaringBitmap,结合Hologres维表,实现实时去重统计UV的应用,并将统计结果写入Hologres。
通过Flink DataStream接口的实时数据写入方法,可以对数据进行基于Hologres Shard的Repartition操作,有效减少写入Hologres实例时的小文件数量,从而提升写入性能并降低系统负载。该方法适用于需要批量导入具有主键的空表,实现类似Insert Overwrite的场景。
Hologres Flink Connector参数说明
您可以将Flink数据写入Hologres,Hologres Flink Connector相关参数具体内容如下:
参数 | 是否必填 | 说明 |
connector | 是 | 结果表类型,固定值为hologres。 |
dbname | 是 | Hologres的数据库名称。 |
tablename | 是 | Hologres接收数据的表名称。 |
username | 是 | 当前阿里云账号的AccessKey ID。 您可以单击AccessKey 管理,获取AccessKey ID。 |
password | 是 | 当前阿里云账号的AccessKey Secret。 您可以单击AccessKey 管理,获取AccessKey Secret。 |
endpoint | 是 | Hologres的VPC网络地址。进入Hologres管理控制台的实例详情页,获取Endpoint。 说明 endpoint需包含端口号,格式为 |
连接参数
参数
是否必填
说明
connectionSize
否
单个Flink Hologres Task所创建的JDBC连接池大小。
默认值:3,和吞吐成正比。
connectionPoolName
否
连接池名称,同一个TaskManager中,表配置同名的连接池名称可以共享连接池。
无默认值,每个表默认使用自己的连接池。如果设置连接池名称,则所有表的connectionSize需要相同
fixedConnectionMode
否
写入和点查不占用连接数(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3)
默认值:false
jdbcRetryCount
否
当连接故障时,写入和查询的重试次数。
默认值:10。
jdbcRetrySleepInitMs
否
每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。
默认值:1000ms。
jdbcRetrySleepStepMs
否
每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。
默认值:5000ms。
jdbcConnectionMaxIdleMs
否
写入线程和点查线程数据库连接的最大Idle时间,超过连接将被释放。
默认值:60000ms。
jdbcMetaCacheTTL
否
TableSchema信息的本地缓存时间。
默认值:60000ms。
jdbcMetaAutoRefreshFactor
否
当TableSchema cache剩余存活时间短于
metaCacheTTL/metaAutoRefreshFactor
将自动刷新cache。默认值:-1,表示不自动刷新。
connection.ssl.mode
否
是否启用数据传输加密。取值说明如下:
disable(默认值):不启用传输加密。
require:启用SSL,只对数据链路加密。
verify-ca:启用SSL,加密数据链路,同时使用CA证书验证Hologres服务端的真实性。
verify-full:启用SSL,加密数据链路,使用CA证书验证Hologres服务端的真实性,同时比对证书内的CN或DNS与连接时配置的Hologres连接地址是否一致。
connection.ssl.root-cert.location
否
CA证书的路径,且确保已经将CA证书上传至Flink集群环境。
说明当connection.ssl.mode参数配置为verify-ca或verify-full时,需要配置此参数。
jdbcDirectConnect
否
是否开启直连。取值说明如下:
false(默认值):不开启。
true:开启。
Flink批量写入的瓶颈是VIP Endpoint的网络吞吐。开启此参数会测试当前环境能否直连Hologres FE,若支持默认使用直连。
写入参数
参数
是否必填
说明
mutatetype
否
数据写入模式,详情请参见流式语义。
默认值:insertorignore。
ignoredelete
否
是否忽略撤回消息。通常Flink的Group By会产生回撤消息,回撤消息到Hologres connector会产生Delete请求。
默认值:true,仅在使用流式语义时生效。
createparttable
否
当写入分区表时,是否自动根据分区值自动创建分区表。
false(默认值):不会自动创建。
true:自动创建。
建议慎用该功能,确保分区值不会出现脏数据导致创建错误的分区表。
ignoreNullWhenUpdate
否
当
mutatetype='insertOrUpdate'
时,是否忽略更新写入数据中的Null值。默认值:false。
jdbcWriteBatchSize
否
Hologres Sink节点数据攒批的最大批大小。
默认值:256
jdbcWriteBatchByteSize
否
Hologres Sink节点单个线程数据攒批的最大字节大小。
默认值:2097152(2 * 1024 * 1024),2MB。
jdbcWriteBatchTotalByteSize
否
Hologres Sink节点所有数据攒批的最大字节大小。
默认值:20971520(20 * 1024 * 1024),20MB。
jdbcWriteFlushInterval
否
Hologres Sink节点数据攒批的最长Flush等待时间。
默认值:10000,即10秒。
jdbcUseLegacyPutHandler
否
写入SQL格式,取值说明如下:
true:写入SQL格式为
insert into xxx(c0,c1,...) values (?,?,...),... on conflict;
。false(默认值):写入SQL格式为
insert into xxx(c0,c1,...) select unnest(?),unnest(?),... on conflict
。
jdbcEnableDefaultForNotNullColumn
否
设置为true时,not null且未在表上设置default的字段传入null时,将以默认值写入。String类型默认为"",Number类型默认为0,Date、timestamp、timestamptz 类型默认为1970-01-01 00:00:00。
默认值:true。
remove-u0000-in-text.enabled
否
是否自动替换TEXT数据类型中的非UTF-8的u0000字符。取值说明如下:
true:替换。
false(默认值):不替换。
deduplication.enabled
否
若一批写入的数据中有主键相同的数据,是否进行去重。取值说明如下:
true(默认值):进行去重,只保留后到达的这条数据。
false:不进行去重。
首先将批处理数据写入,待写入完成后再继续写入后到达的数据。
说明在不进行去重时,极端情况下(例如所有数据的主键都相同),写入操作会退化为非批量写入,对性能会产生一定影响。
aggressive.enabled
否
是否启用激进提交模式。取值说明如下:
true:启用。
启用后,即使批量处理没有达到预期的条数,只要检测到连接空闲,系统就会强制提交数据。在流量较小时,这种方法可以有效减少数据传输的延时。
false(默认值):不启用。
jdbcCopyWriteMode
否
数据写入方式。取值说明如下:
true:使用Copy方式写入,Copy写入方式分为流式Copy(Fixed Copy)和批量Copy,当前默认使用流式Copy(Fixed Copy)方式写入。
说明与使用INSERT方式写入相比,Fixed Copy方式可以实现更高的吞吐(因为采用流模式),更低的数据延时以及更低的客户端内存消耗(因为不需要攒批数据),但不支持数据回撤功能。
false(默认值):使用INSERT方式写入。
说明仅Hologres V1.3.1及以上版本支持此参数。
jdbcCopyWriteFormat
否
底层是否采用二进制协议。
binary(默认值):表示使用二进制模式,二进制会更快。
text:为文本模式。
说明仅Hologres V1.3.1及以上版本支持此参数。
bulkLoad
是否使用批量Copy方式写入。取值说明如下:
true:使用批量Copy方式写入,仅jdbcCopyWriteMode参数也设置为true时,该参数才会生效,否则使用Fixed Copy方式写入。
说明批量Copy相较于流式Copy(Fixed Copy),具备更高的效率,能更好地利用Hologres的资源,从而在数据写入过程中提供更优的性能,您可以根据业务需要,选择合适的数据写入方式。
在对主键表进行批量Copy写入时,通常会出现表锁的情况,您可以通过配置target-shards.enabled参数为true,将写入锁粒度降至Shard级别,从而允许并发执行多个批量导入任务,减少了表锁的发生。相比Fixed Copy模式,批量Copy写入有主键表时,通过这种方式能够显著降低Hologres实例的负载,实测显示,可以减少约66.7%的负载。
批量Copy写入时,如果目标表包含主键,要求在写入之前目标表为空表,否则写入过程中进行主键去重会影响写入性能。
false(默认值):不使用。
说明仅Hologres V1.4.0及以上版本支持此参数。
target-shards.enabled
是否启用Target Shard批量写入。取值说明如下:
true:启用Target Shard批量写入,当源数据已按Shard重新分区时,可以将写入锁粒度降至Shard级别。
false(默认值):不启用。
说明仅Hologres V1.4.1及以上版本支持此参数。
点查参数
参数
是否必填
说明
jdbcReadBatchSize
否
维表点查最大批次大小。
默认值:128。
jdbcReadBatchQueueSize
否
维表点查请求缓冲队列大小。
默认值:256。
async
否
是否采用异步方式同步数据。
默认值:false。异步模式可以并发地处理多个请求和响应,从而连续的请求之间不需要阻塞等待,提高查询的吞吐。但在异步模式下,无法保证请求的绝对顺序。
cache
否
缓存策略。
默认值:None。Hologres仅支持以下两种缓存策略:None:无缓存。LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。
cachesize
否
缓存大小。
默认值:10000。选择LRU缓存策略后,可以设置缓存大小。
cachettlms
否
更新缓存的时间间隔,单位为毫秒。
当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。
cacheempty
否
是否缓存join结果为空的数据。
默认值:true,表示缓存join结果为空的数据。false:表示不缓存join结果为空的数据。
数据类型映射
当前Flink全托管与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射。