本文为您介绍开源Flink 1.11如何实时写入数据至Hologres。

背景信息

从开源Flink1.11版本开始,Hologres代码已开源。详细内容请参见Hologres GitHub官方库

Flink SQL写入数据至Hologres代码示例

您可以参照如下代码示例,通过将Flink SQL将数据写入Hologres。其中,更多详细的代码示例请参见Hologres GitHub官方库
        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

Hologres Flink Connector参数说明

您可以将Flink数据写入Hologres,Hologres Flink Connector相关参数具体内容如下:
参数 是否必填 说明
connector 结果表类型,固定值为hologres。
dbname Hologres的数据库名称。
tablename Hologres接收数据的表名称。
username 当前阿里云账号的AccessKey ID。

您可以单击AccessKey 管理,获取AccessKey ID。

password 当前阿里云账号的AccessKey Secret。

您可以单击AccessKey 管理,获取AccessKey Secret。

endpoint Hologres的VPC网络地址。进入Hologres管理控制台的实例详情页,从实例配置获取Endpoint。
说明 endpoint需包含端口号,格式为ip:port同一个区域使用VPC网络地址,跨区域请使用公共网络。
mutatetype 数据写入模式,默认值为insertorignore
ignoredelete 是否忽略撤回消息。通常Flink的Group by会产生回撤消息,回撤消息到Hologres Connector会产生Delete请求。默认值为false
说明 仅在使用流式语义时生效。
partitionrouter 是否写入分区表。默认值为false
createparttable 当写入分区表时,是否自动根据分区值自动创建分区表。具体取值如下:
  • false(默认值):不会自动创建分区表。
  • true:自动创建分区表。
说明 Flink 全托管2.1.x版本及以上支持自动创建分区表。建议您在使用该功能之前,确保分区值不会出现脏数据,否则会导致分区表创建错误。
connectionSize 单个Flink Hologres Task所创建的JDBC连接池大小。默认值为3,和吞吐成正比。
jdbcWriteBatchSize Hologres Sink节点数据攒批的最大批大小(JDBC模式)。默认值为256。
jdbcWriteFlushInterval Hologres Sink节点数据攒批的最长Flush等待时间(JDBC模式)默认值为10000ms,即10秒。