Flink全托管产品(Flink Serverless)是基于Apache Flink构建的全托管产品,为您提供全托管的实时计算服务。本文为您介绍Flink全托管如何创建Hologres源表。

使用说明

  • Hologres源表默认使用批模式读取数据,即对全表数据仅扫描一次。扫描结束,消费即结束,扫描结束后输入的数据将不会被Hologres源表读取。
  • 全托管Flink从VVP-2.4版本开始,支持实时消费Hologres的数据,请您升级全托管Flink至VVP-2.4版本。

DDL定义

  • DDL语句
    创建Hologres源表的DDL语句如下。
    create table hologres_source(
      name varchar,
      age BIGINT,
      birthday BIGINT
    ) with (
      'connector'='hologres',
      'dbname'='<yourDbname>', --Hologres的数据库名称。
      'tablename'='<yourTablename>',  --Hologres用于接收数据的表名称。
      'username'='<yourAccessID>',  --当前阿里云账号的AccessKey ID。
      'password'='<yourAccessSecret>',  --当前阿里云账号的AccessKey Secret。
      'endpoint'='<vpc_ip:vpc_port>',  --当前Hologres实例VPC网络的Endpoint。
      'field_delimiter'='|' --该参数为可选参数。
    );
    说明 Flink全托管不支持在源表中定义计算列。
  • 参数说明
    With参数的描述如下表所示。
    参数 描述 是否必填
    connector 源表类型。

    固定值为hologres

    dbname Hologres的数据库名称。
    tablename Hologres用于接收数据的表名称。
    username 当前阿里云账号的AccessKey ID。

    您可以登录AccessKey 管理,获取AccessKey ID。

    password 当前阿里云账号的AccessKey Secret。

    您可以登录AccessKey 管理,获取AccessKey Secret。

    endpoint Hologres的VPC网络地址。
    您可以登录Hologres管控台,进入目标实例的详情页,在实例配置中获取Endpoint。Endpoint需包含端口号,格式为ip:port。
    说明 如果Flink与Hologres实例部署在同一个地域,请使用VPC网络的网络地址。如果在不同地域,请使用公共网络的网络地址,并确保Flink集群能正常访问公网(公网网络延迟较高)。
    field_delimiter 导出数据时,不同行之间使用的分隔符。

    默认值为"\u0002"

    binlog 是否为Binlog source,VVP 2.4版本开始支持,默认为false。如果需要消费,需要将binlog设置为true
    binlogMaxRetryTimes 读取Binlog出错重试次数,默认为60次。
    binlogRetryIntervalMs 读取Binlog出错重试间隔,默认为2000ms。
    binlogBatchReadSize 读取Binlog批量大小,默认为16个。
    cdcMode 读取Binlog时是否采用CDC模式,默认为false。详请参见Flink实时消费Binlog
    startTime 启动位点的时间,格式为yyyy-MM-dd hh:mm:ss。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。
  • 使用示例
    创建Hologres的源表并导入Flink的数据,示例SQL语句如下。
    CREATE TEMPORARY TABLE hologres_source (
      name varchar, 
      age BIGINT,
      birthday BIGINT
    ) with (
      'connector'='hologres',
      'dbname'='<yourDbname>',
      'tablename'='<yourTablename>',
      'username'='<yourAccessID>',
      'password'='<yourAccessSecret>',
      'endpoint'='<yourEndpoint>',
      'field_delimiter'='|'  --该参数可选。
    );
    CREATE TEMPORARY TABLE blackhole_sink(
      name varchar,
      age BIGINT,
      birthday BIGINT 
    ) with (
      'connector'='blackhole'
    );
    INSERT INTO blackhole_sink
    SELECT 
       name, age, birthday
    from hologres_source;

Flink实时消费Binlog

Flink VVP-2.4及以上版本,Hologres Connector支持实时消费Binlog,详情请参见Flink实时消费Binlog

数据类型映射

Flink全托管与Hologres的数据类型映射,请参见数据类型汇总