开源Flink 1.11及以上版本实时写入

本文为您介绍开源Flink 1.11如何实时写入数据至Hologres。

前提条件

  • 开通Hologres实例,并连接开发工具,详情请参见连接HoloWeb

  • 搭建Flink集群(本次示例使用的是1.15版本),可以前往Flink官网下载二进制包,启动一个Standalone集群,详情请参见文档集群搭建

背景信息

从开源Flink1.11版本开始,Hologres代码已开源,相应版本的Connector已经在中央仓库发布Release包,可在项目中参照如下pom文件进行配置。详细内容请参见Hologres GitHub官方库

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.15</artifactId>
    <version>1.4.0</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

开源Flink版本与hologres-connector-flink最新版本对应关系如下,建议使用1.15及以上版本,功能更丰富:

Flink版本

Connector版本

Flink 1.11

hologres-connector-flink-1.11:1.0.1

Flink 1.12

hologres-connector-flink-1.12:1.0.1

Flink 1.13

hologres-connector-flink-1.13:1.3.2

Flink 1.14

hologres-connector-flink-1.14:1.3.2

Flink 1.15

hologres-connector-flink-1.15:1.4.1

Flink 1.17

hologres-connector-flink-1.17:1.4.1

Flink SQL写入数据至Hologres代码示例

您可以参照如下代码示例,通过将Flink SQL将数据写入Hologres。其中,更多详细的代码示例请参见Hologres GitHub官方库

        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

更多详尽的代码示例请参见hologres-connector-flink-examples,包括如下示例。

  • FlinkSQLToHoloExample:一个使用纯Flink SQL接口实现的应用,将数据写入至Hologres。

  • FlinkDSAndSQLToHoloExample:一个混合Flink DataStream以及SQL 接口实现的应用,写入Hologres前,将DataStream转换成Table,之后再用Flink SQL写入Hologres。

  • FlinkDataStreamToHoloExample:一个使用纯Flink DataStream接口实现的应用,将数据写入至Hologres。

  • FlinkRoaringBitmapAggJob:一个使用Flink及RoaringBitmap,结合Hologres维表,实现实时去重统计UV的应用,并将统计结果写入Hologres。

  • 通过Flink DataStream接口的实时数据写入方法,可以对数据进行基于Hologres Shard的Repartition操作,有效减少写入Hologres实例时的小文件数量,从而提升写入性能并降低系统负载。该方法适用于需要批量导入具有主键的空表,实现类似Insert Overwrite的场景。

Hologres Flink Connector参数说明

您可以将Flink数据写入Hologres,Hologres Flink Connector相关参数具体内容如下:

参数

是否必填

说明

connector

结果表类型,固定值为hologres。

dbname

Hologres的数据库名称。

tablename

Hologres接收数据的表名称。

username

当前阿里云账号的AccessKey ID。

您可以单击AccessKey 管理,获取AccessKey ID。

password

当前阿里云账号的AccessKey Secret。

您可以单击AccessKey 管理,获取AccessKey Secret。

endpoint

Hologres的VPC网络地址。进入Hologres管理控制台的实例详情页,获取Endpoint。

说明

endpoint需包含端口号,格式为ip:port。同一个区域使用VPC网络地址,跨区域请使用公共网络。

  • 连接参数

    参数

    是否必填

    说明

    connectionSize

    单个Flink Hologres Task所创建的JDBC连接池大小。

    默认值:3,和吞吐成正比。

    connectionPoolName

    连接池名称,同一个TaskManager中,表配置同名的连接池名称可以共享连接池。

    无默认值,每个表默认使用自己的连接池。如果设置连接池名称,则所有表的connectionSize需要相同

    fixedConnectionMode

    写入和点查不占用连接数(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3)

    默认值:false

    jdbcRetryCount

    当连接故障时,写入和查询的重试次数。

    默认值:10。

    jdbcRetrySleepInitMs

    每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。

    默认值:1000ms。

    jdbcRetrySleepStepMs

    每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。

    默认值:5000ms。

    jdbcConnectionMaxIdleMs

    写入线程和点查线程数据库连接的最大Idle时间,超过连接将被释放。

    默认值:60000ms。

    jdbcMetaCacheTTL

    TableSchema信息的本地缓存时间。

    默认值:60000ms。

    jdbcMetaAutoRefreshFactor

    当TableSchema cache剩余存活时间短于metaCacheTTL/metaAutoRefreshFactor 将自动刷新cache。

    默认值:-1,表示不自动刷新。

    connection.ssl.mode

    是否启用数据传输加密。取值说明如下:

    • disable(默认值):不启用传输加密。

    • require:启用SSL,只对数据链路加密。

    • verify-ca:启用SSL,加密数据链路,同时使用CA证书验证Hologres服务端的真实性。

    • verify-full:启用SSL,加密数据链路,使用CA证书验证Hologres服务端的真实性,同时比对证书内的CN或DNS与连接时配置的Hologres连接地址是否一致。

    connection.ssl.root-cert.location

    CA证书的路径,且确保已经将CA证书上传至Flink集群环境。

    说明

    当connection.ssl.mode参数配置为verify-ca或verify-full时,需要配置此参数。

    jdbcDirectConnect

    是否开启直连。取值说明如下:

    • false(默认值):不开启。

    • true:开启。

      Flink批量写入的瓶颈是VIP Endpoint的网络吞吐。开启此参数会测试当前环境能否直连Hologres FE,若支持默认使用直连。

  • 写入参数

    参数

    是否必填

    说明

    mutatetype

    数据写入模式,详情请参见流式语义

    默认值:insertorignore。

    ignoredelete

    是否忽略撤回消息。通常Flink的Group By会产生回撤消息,回撤消息到Hologres connector会产生Delete请求。

    默认值:true,仅在使用流式语义时生效。

    createparttable

    当写入分区表时,是否自动根据分区值自动创建分区表。

    • false(默认值):不会自动创建。

    • true:自动创建。

    建议慎用该功能,确保分区值不会出现脏数据导致创建错误的分区表。

    ignoreNullWhenUpdate

    mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。

    默认值:false。

    jdbcWriteBatchSize

    Hologres Sink节点数据攒批的最大批大小。

    默认值:256

    jdbcWriteBatchByteSize

    Hologres Sink节点单个线程数据攒批的最大字节大小。

    默认值:2097152(2 * 1024 * 1024),2MB。

    jdbcWriteBatchTotalByteSize

    Hologres Sink节点所有数据攒批的最大字节大小。

    默认值:20971520(20 * 1024 * 1024),20MB。

    jdbcWriteFlushInterval

    Hologres Sink节点数据攒批的最长Flush等待时间。

    默认值:10000,即10秒。

    jdbcUseLegacyPutHandler

    写入SQL格式,取值说明如下:

    • true:写入SQL格式为insert into xxx(c0,c1,...) values (?,?,...),... on conflict;

    • false(默认值):写入SQL格式为insert into xxx(c0,c1,...) select unnest(?),unnest(?),... on conflict

    jdbcEnableDefaultForNotNullColumn

    设置为true时,not null且未在表上设置default的字段传入null时,将以默认值写入。String类型默认为"",Number类型默认为0,Date、timestamp、timestamptz 类型默认为1970-01-01 00:00:00。

    默认值:true。

    remove-u0000-in-text.enabled

    是否自动替换TEXT数据类型中的非UTF-8的u0000字符。取值说明如下:

    • true:替换。

    • false(默认值):不替换。

    deduplication.enabled

    若一批写入的数据中有主键相同的数据,是否进行去重。取值说明如下:

    • true(默认值):进行去重,只保留后到达的这条数据。

    • false:不进行去重。

      首先将批处理数据写入,待写入完成后再继续写入后到达的数据。

      说明

      在不进行去重时,极端情况下(例如所有数据的主键都相同),写入操作会退化为非批量写入,对性能会产生一定影响。

    aggressive.enabled

    是否启用激进提交模式。取值说明如下:

    • true:启用。

      启用后,即使批量处理没有达到预期的条数,只要检测到连接空闲,系统就会强制提交数据。在流量较小时,这种方法可以有效减少数据传输的延时。

    • false(默认值):不启用。

    jdbcCopyWriteMode

    数据写入方式。取值说明如下:

    • true:使用Copy方式写入,Copy写入方式分为流式Copy(Fixed Copy)和批量Copy,当前默认使用流式Copy(Fixed Copy)方式写入。

      说明

      与使用INSERT方式写入相比,Fixed Copy方式可以实现更高的吞吐(因为采用流模式),更低的数据延时以及更低的客户端内存消耗(因为不需要攒批数据),但不支持数据回撤功能。

    • false(默认值):使用INSERT方式写入。

    说明

    仅Hologres V1.3.1及以上版本支持此参数。

    jdbcCopyWriteFormat

    底层是否采用二进制协议。

    • binary(默认值):表示使用二进制模式,二进制会更快。

    • text:为文本模式。

    说明

    仅Hologres V1.3.1及以上版本支持此参数。

    bulkLoad

    是否使用批量Copy方式写入。取值说明如下:

    • true:使用批量Copy方式写入,仅jdbcCopyWriteMode参数也设置为true时,该参数才会生效,否则使用Fixed Copy方式写入。

      说明
      • 批量Copy相较于流式Copy(Fixed Copy),具备更高的效率,能更好地利用Hologres的资源,从而在数据写入过程中提供更优的性能,您可以根据业务需要,选择合适的数据写入方式。

      • 在对主键表进行批量Copy写入时,通常会出现表锁的情况,您可以通过配置target-shards.enabled参数为true,将写入锁粒度降至Shard级别,从而允许并发执行多个批量导入任务,减少了表锁的发生。相比Fixed Copy模式,批量Copy写入有主键表时,通过这种方式能够显著降低Hologres实例的负载,实测显示,可以减少约66.7%的负载。

      • 批量Copy写入时,如果目标表包含主键,要求在写入之前目标表为空表,否则写入过程中进行主键去重会影响写入性能。

    • false(默认值):不使用。

    说明

    仅Hologres V1.4.0及以上版本支持此参数。

    target-shards.enabled

    是否启用Target Shard批量写入。取值说明如下:

    • true:启用Target Shard批量写入,当源数据已按Shard重新分区时,可以将写入锁粒度降至Shard级别。

    • false(默认值):不启用。

    说明

    仅Hologres V1.4.1及以上版本支持此参数。

  • 点查参数

    参数

    是否必填

    说明

    jdbcReadBatchSize

    维表点查最大批次大小。

    默认值:128。

    jdbcReadBatchQueueSize

    维表点查请求缓冲队列大小。

    默认值:256。

    async

    是否采用异步方式同步数据。

    默认值:false。异步模式可以并发地处理多个请求和响应,从而连续的请求之间不需要阻塞等待,提高查询的吞吐。但在异步模式下,无法保证请求的绝对顺序。

    cache

    缓存策略。

    默认值:None。Hologres仅支持以下两种缓存策略:None:无缓存。LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。

    cachesize

    缓存大小。

    默认值:10000。选择LRU缓存策略后,可以设置缓存大小。

    cachettlms

    更新缓存的时间间隔,单位为毫秒。

    当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。

    cacheempty

    是否缓存join结果为空的数据。

    默认值:true,表示缓存join结果为空的数据。false:表示不缓存join结果为空的数据。

  • 数据类型映射

    当前Flink全托管与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射