Hologres推出的新版Flink Connector插件,支持通过Flink将数据批量导入到Hologres,实现高效且低负载的数据导入。
背景信息
在大数据处理领域,Hologres作为一款强大的在线分析处理(OLAP)系统,与Flink的集成提供了强大的实时数据流处理能力。然而,对数据时效性要求不高的场景,如历史数据的批量加载、离线数据处理或日志聚合等任务,推荐使用Flink批量导入Hologres。批量导入能够以更高效、节约资源的方式将大量数据一次性写入Hologres,不仅提升了导入效率,还兼顾了资源利用率。您可以根据自身的业务特性和资源状况,灵活选择实时导入或批量导入。关于实时导入详情,请参见Flink全托管。
前提条件
已购买Hologres实例。具体操作,请参见购买Hologres。
已部署Flink 1.15及以上版本集群环境。具体操作,详情请参见
开源Flink:部署Flink。
阿里云实时计算Flink版:开通实时计算Flink版。
阿里云实时计算Flink版批量导入
通过连接HoloWeb并执行查询创建Hologres结果表,用于接收Flink导入的数据。本文以
test_sink_customer
表为例。-- 创建Hologres结果表 CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );
说明Flink源表需要与Hologres的结果表的字段名称和类型保持一致。
登录实时计算控制台,在作业运维页面单击部署作业,配置部署作业并单击部署。配置参数详情,请参见部署JAR作业。
其中主要参数介绍,如下表所示。
参数
说明
部署作业类型
选择为JAR。
部署模式
支持流模式或批模式,本文选择批模式。
引擎版本
JAR URI
上传开源Flink Connector:hologres-connector-flink-repartition.jar。
说明通过开源Flink Connector支持将数据批量导入Hologres,Flink Connector插件相关开源代码,详情请参见Hologres GitHub官方库。
Entry Point Class
程序的入口类。Flink Connector指定主类名称为:
com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample
。Entry Point Main Arguments
传入
repartition.sql
文件的路径参数。实时计算 Flink运行时,附加依赖文件的路径在/flink/usrlib/
下,因此完整的参数为:--sqlFilePath="/flink/usrlib/repartition.sql"
。附加依赖文件
上传
repartition.sql
文件。repartition.sql
是Flink SQL脚本的文件,主要用于定义数据源、结果表声明以及Hologres的连接信息,本文repartition.sql
文件的示例内容如下。--sourceDDL,本文使用了Flink DataGen公共测试数据作为源数据。 CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); --sourceDql,源表查询语句应当保证查询结果与sinkDDL声明的结果表对应,包括字段数量和字段类型。 SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL,结果表声明以及配置连接Hologres信息。 CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'LTAI5tJCNqeCY3DtKw8c****' ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );
说明repartition.sql
文件中连接Hologres的更多参数介绍详情,请参见Hologres Flink Connector参数说明。单击作业名称,进入部署详情看板,编辑资源配置,修改并发度。
说明建议与Hologres结果表的ShardCount数保持一致。
查询Hologres结果表。
Flink作业提交成功后,您可以在Hologres中查询写入的数据。示例语句如下。
SELECT * FROM test_sink_customer;
开源Flink批量导入
通过连接HoloWeb并执行查询创建Hologres结果表,用于接收Flink导入的数据。本文以
test_sink_customer
表为例。-- 创建Hologres结果表 CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );
说明您可以根据数据量合理设置Shard数,关于Shard详情,请参见Table Group与Shard Count操作指南。
创建
repartition.sql
文件并上传至Flink集群环境中的任意位置。本文以上传至/flink-1.15.4/src/repartition.sql
路径为例,repartition.sql
文件的示例内容如下。说明repartition.sql
是Flink SQL脚本的文件,主要用于定义数据源、结果表声明以及Hologres的连接信息。--sourceDDL,本文使用了Flink DataGen公共测试数据作为源数据。 CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); --sourceDql,源表查询语句应当保证查询结果与sinkDDL声明的结果表对应,包括字段数量和字段类型。 SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL,结果表声明以及配置连接Hologres信息。 CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'LTAI5tJCNqeCY3DtKw8c****' ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );
其中主要参数解释如下:
参数
是否必填
说明
connector
是
结果表类型,固定值为hologres。
dbname
是
Hologres的数据库名称。
tablename
是
Hologres接收数据的表名称。
username
是
当前阿里云账号的AccessKey ID。
您可以单击AccessKey 管理,获取AccessKey ID。
password
是
当前阿里云账号AccessKey ID对应的AccessKey Secret。
endpoint
是
Hologres的VPC网络地址。进入Hologres管理控制台的实例详情页,从实例配置获取Endpoint。
说明endpoint需包含端口号,格式为
ip:port
同一个区域使用VPC网络地址,跨区域请使用公共网络。jdbccopywritemode
否
数据写入方式。取值说明如下:
false(默认值):使用INSERT方式写入。
true:使用Copy方式写入,Copy写入方式分为流式Copy(Fixed Copy)和批量Copy,当前默认使用流式Copy(Fixed Copy)方式写入。
说明与使用INSERT方式写入相比,Fixed Copy方式可以实现更高的吞吐(因为采用流模式),更低的数据延时以及更低的客户端内存消耗(因为不需要攒批数据),但不支持数据回撤功能。
bulkload
否
是否使用批量Copy方式写入。取值说明如下:
true:使用批量Copy方式写入,仅jdbccopywritemode参数也设置为true时,该参数才会生效,否则使用Fixed Copy方式写入。
说明批量Copy相较于流式Copy(Fixed Copy),具备更高的效率,能更好地利用Hologres的资源,从而在数据写入过程中提供更优的性能,您可以根据业务需要,选择合适的数据写入方式。
在对主键表进行批量Copy写入时,通常会出现表锁的情况,您可以通过配置target-shards.enabled参数为true,将写入锁粒度降至Shard级别,从而允许并发执行多个批量导入任务,减少了表锁的发生。相比Fixed Copy模式,批量Copy写入有主键表时,通过这种方式能够显著降低Hologres实例的负载,实测显示,可以减少约66.7%的负载。
批量Copy写入时,如果目标表包含主键,要求在写入之前目标表为空表,否则写入过程中进行主键去重会影响写入性能。
false(默认值):不使用。
target-shards.enabled
否
是否启用Target Shard批量写入。取值说明如下:
true:启用Target Shard批量写入,当源数据已按Shard重新分区时,可以将写入锁粒度降至Shard级别。
false(默认值):不启用。
说明repartition.sql
文件中连接Hologres的更多参数介绍详情,请参见Hologres Flink Connector参数说明。在Flink集群环境中,上传开源Flink Connector:hologres-connector-flink-repartition.jar至任意目录下。本文以上传至根目录为例。
说明通过开源Flink Connector支持将数据批量导入Hologres,Flink Connector插件相关开源代码,详情请参见Hologres GitHub官方库。
提交Flink作业,代码示例如下。
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"
上述参数说明:
Dexecution.runtime-mode:Flink作业的执行模式,详情请参见执行模式。
p:作业并发数。建议在配置作业并发数时取值与结果表的ShardCount相同,或者可以被ShardCount整除。
c:hologres-connector-flink-repartition.jar的主类名称以及所在的路径。
sqlFilePath:
repartition.sql
文件的路径。
查询Hologres结果表。
Flink作业提交成功后,您可以在Hologres中查询写入的数据。示例语句如下。
SELECT * FROM test_sink_customer;