Flink批量导入Hologres

Hologres推出的新版Flink Connector插件,支持通过Flink将数据批量导入到Hologres,实现高效且低负载的数据导入。

背景信息

在大数据处理领域,Hologres作为一款强大的在线分析处理(OLAP)系统,与Flink的集成提供了强大的实时数据流处理能力。然而,对数据时效性要求不高的场景,如历史数据的批量加载、离线数据处理或日志聚合等任务,推荐使用Flink批量导入Hologres。批量导入能够以更高效、节约资源的方式将大量数据一次性写入Hologres,不仅提升了导入效率,还兼顾了资源利用率。您可以根据自身的业务特性和资源状况,灵活选择实时导入或批量导入。关于实时导入详情,请参见Flink全托管

前提条件

阿里云实时计算Flink版批量导入

  1. 通过连接HoloWeb并执行查询创建Hologres结果表,用于接收Flink导入的数据。本文以test_sink_customer表为例。

    -- 创建Hologres结果表
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    说明

    Flink源表需要与Hologres的结果表的字段名称和类型保持一致。

  2. 登录实时计算控制台,在作业运维页面单击部署作业,配置部署作业并单击部署。配置参数详情,请参见部署JAR作业

    其中主要参数介绍,如下表所示。

    参数

    说明

    部署作业类型

    选择为JAR。

    部署模式

    支持流模式或批模式,本文选择批模式。

    引擎版本

    引擎版本详情请参见引擎版本介绍生命周期策略。本文以vvr-8.0.7-flink-1.17版本为例。

    JAR URI

    上传开源Flink Connector:hologres-connector-flink-repartition.jar

    说明

    通过开源Flink Connector支持将数据批量导入Hologres,Flink Connector插件相关开源代码,详情请参见Hologres GitHub官方库

    Entry Point Class

    程序的入口类。Flink Connector指定主类名称为:com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample

    Entry Point Main Arguments

    传入repartition.sql文件的路径参数。实时计算 Flink运行时,附加依赖文件的路径在/flink/usrlib/下,因此完整的参数为:--sqlFilePath="/flink/usrlib/repartition.sql"

    附加依赖文件

    上传repartition.sql文件。repartition.sql是Flink SQL脚本的文件,主要用于定义数据源、结果表声明以及Hologres的连接信息,本文repartition.sql文件的示例内容如下。

    --sourceDDL,本文使用了Flink DataGen公共测试数据作为源数据。
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    --sourceDql,源表查询语句应当保证查询结果与sinkDDL声明的结果表对应,包括字段数量和字段类型。
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL,结果表声明以及配置连接Hologres信息。
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'LTAI5tJCNqeCY3DtKw8c****'
      ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    说明

    repartition.sql文件中连接Hologres的更多参数介绍详情,请参见Hologres Flink Connector参数说明

  3. 单击作业名称,进入部署详情看板,编辑资源配置,修改并发度

    说明

    建议与Hologres结果表的ShardCount数保持一致。

  4. 查询Hologres结果表。

    Flink作业提交成功后,您可以在Hologres中查询写入的数据。示例语句如下。

    SELECT * FROM test_sink_customer;

开源Flink批量导入

  1. 通过连接HoloWeb并执行查询创建Hologres结果表,用于接收Flink导入的数据。本文以test_sink_customer表为例。

    -- 创建Hologres结果表
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    说明

    您可以根据数据量合理设置Shard数,关于Shard详情,请参见Table Group与Shard Count操作指南

  2. 创建repartition.sql文件并上传至Flink集群环境中的任意位置。本文以上传至/flink-1.15.4/src/repartition.sql路径为例,repartition.sql文件的示例内容如下。

    说明

    repartition.sql是Flink SQL脚本的文件,主要用于定义数据源、结果表声明以及Hologres的连接信息。

    --sourceDDL,本文使用了Flink DataGen公共测试数据作为源数据。
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    --sourceDql,源表查询语句应当保证查询结果与sinkDDL声明的结果表对应,包括字段数量和字段类型。
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL,结果表声明以及配置连接Hologres信息。
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'LTAI5tJCNqeCY3DtKw8c****'
      ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    其中主要参数解释如下:

    参数

    是否必填

    说明

    connector

    结果表类型,固定值为hologres。

    dbname

    Hologres的数据库名称。

    tablename

    Hologres接收数据的表名称。

    username

    当前阿里云账号的AccessKey ID。

    您可以单击AccessKey 管理,获取AccessKey ID。

    password

    当前阿里云账号AccessKey ID对应的AccessKey Secret。

    endpoint

    Hologres的VPC网络地址。进入Hologres管理控制台的实例详情页,从实例配置获取Endpoint。

    说明

    endpoint需包含端口号,格式为ip:port同一个区域使用VPC网络地址,跨区域请使用公共网络。

    jdbccopywritemode

    数据写入方式。取值说明如下:

    • false(默认值):使用INSERT方式写入。

    • true:使用Copy方式写入,Copy写入方式分为流式Copy(Fixed Copy)和批量Copy,当前默认使用流式Copy(Fixed Copy)方式写入。

      说明

      与使用INSERT方式写入相比,Fixed Copy方式可以实现更高的吞吐(因为采用流模式),更低的数据延时以及更低的客户端内存消耗(因为不需要攒批数据),但不支持数据回撤功能。

    bulkload

    是否使用批量Copy方式写入。取值说明如下:

    • true:使用批量Copy方式写入,仅jdbccopywritemode参数也设置为true时,该参数才会生效,否则使用Fixed Copy方式写入。

      说明
      • 批量Copy相较于流式Copy(Fixed Copy),具备更高的效率,能更好地利用Hologres的资源,从而在数据写入过程中提供更优的性能,您可以根据业务需要,选择合适的数据写入方式。

      • 在对主键表进行批量Copy写入时,通常会出现表锁的情况,您可以通过配置target-shards.enabled参数为true,将写入锁粒度降至Shard级别,从而允许并发执行多个批量导入任务,减少了表锁的发生。相比Fixed Copy模式,批量Copy写入有主键表时,通过这种方式能够显著降低Hologres实例的负载,实测显示,可以减少约66.7%的负载。

      • 批量Copy写入时,如果目标表包含主键,要求在写入之前目标表为空表,否则写入过程中进行主键去重会影响写入性能。

    • false(默认值):不使用。

    target-shards.enabled

    是否启用Target Shard批量写入。取值说明如下:

    • true:启用Target Shard批量写入,当源数据已按Shard重新分区时,可以将写入锁粒度降至Shard级别。

    • false(默认值):不启用。

    说明

    repartition.sql文件中连接Hologres的更多参数介绍详情,请参见Hologres Flink Connector参数说明

  3. 在Flink集群环境中,上传开源Flink Connector:hologres-connector-flink-repartition.jar至任意目录下。本文以上传至根目录为例。

    说明

    通过开源Flink Connector支持将数据批量导入Hologres,Flink Connector插件相关开源代码,详情请参见Hologres GitHub官方库

  4. 提交Flink作业,代码示例如下。

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    上述参数说明:

    • Dexecution.runtime-mode:Flink作业的执行模式,详情请参见执行模式

    • p:作业并发数。建议在配置作业并发数时取值与结果表的ShardCount相同,或者可以被ShardCount整除。

    • c:hologres-connector-flink-repartition.jar的主类名称以及所在的路径。

    • sqlFilePath:repartition.sql文件的路径。

  5. 查询Hologres结果表。

    Flink作业提交成功后,您可以在Hologres中查询写入的数据。示例语句如下。

    SELECT * FROM test_sink_customer;