本文介绍如何通过阿里云实时计算Flink版(基于VVP计算引擎)实时读取AnalyticDB PostgreSQL版的数据。

注意事项

该功能暂不支持AnalyticDB PostgreSQL版Serverless版本

前提条件

  • AnalyticDB PostgreSQL版实例和实时计算Flink集群需要位于同一VPC下。
  • 已创建实时计算Flink集群,创建操作,请参见开通流程
  • 已创建AnalyticDB PostgreSQL版实例,创建操作,请参见创建实例

配置实时计算Flink集群

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. Connectors页签,单击+图标上传AnalyticDB PostgreSQL版自定义Flink Connector的JAR包。
    上传Connector
    说明
    • 获取AnalyticDB PostgreSQL版自定义Flink Connector的JAR包,请参见AnalyticDB PostgreSQL Connector
    • JAR包的版本需要与实时计算平台的Flink引擎版本一致。
  5. 完成JAR包的上传后,单击继续
  6. 单击完成

配置AnalyticDB PostgreSQL版实例

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台
  2. 将Flink集群所属的网段加入AnalyticDB PostgreSQL版的白名单,如何添加白名单,请参见设置白名单
  3. 单击登录数据库,连接数据库的更多方式,请参见客户端连接
  4. AnalyticDB PostgreSQL版实例上创建一张表并插入测试数据。
    建表SQL和插入数据SQL的示例如下:
    CREATE TABLE test_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    INSERT INTO test_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 1000) AS t(i);

读取AnalyticDB PostgreSQL版数据

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 创建随机源表datagen_source、对应AnalyticDB PostgreSQL版数据的维表dim_adbpg以及用于结果展示的表print_table,具体步骤如下:
    1. 作业开发页面,单击新建
    2. 新建文件对话框中,输入文件名称,其他配置保持默认即可,单击确认
      新建文件
    3. 将以下作业代码拷贝到作业文本编辑区。
      DROP TABLE IF EXISTS datagen_source;
      CREATE TABLE datagen_source (
       f_sequence INT,
       f_random INT,
       f_random_str STRING
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='5',
       'fields.f_sequence.kind'='sequence',
       'fields.f_sequence.start'='1',
       'fields.f_sequence.end'='1000',
       'fields.f_random.min'='1',
       'fields.f_random.max'='1000',
       'fields.f_random_str.length'='10'
      );
      
      
      DROP TABLE IF EXISTS dim_adbpg;
      CREATE TABLE dim_adbpg(
          id int,
          username varchar,
          PRIMARY KEY(id) not ENFORCED
      ) with(
          'connector' = 'adbpg-nightly-1.13',
          'password' = '*******',
         'tablename' = 'test_dim_table',
         'username' = 'testuser',
         'url' = 'jdbc:postgresql://gp-2z***************-master.gpdb.rds.aliyuncs.com:5432/why_test',
          'joinMaxRows'='100',
          'maxRetryTimes'='1',
          'connectionMaxActive'='5',
          'retryWaitTime'='100',
          'targetSchema'='public',
          'caseSensitive'='0',
          'cache'='lru',
          'cacheSize'='1000',
          'cacheTTLMs'='10000'
      );
      
      DROP TABLE IF EXISTS print_table;
      CREATE TABLE print_table (
        id INT,
        username varchar,
        f_random_str VARCHAR
      ) WITH (
        'connector'='print',
        'logger'='true'
      );

      其中datagen_source表和print_table表的参数无需修改,dim_adbpg表的参数需要根据您实际情况进行修改,参数说明如下:

      参数 是否必填 说明
      connector connector名称,固定为adbpg-nightly-版本号,例如adbpg-nightly-1.13
      url AnalyticDB PostgreSQL版的JDBC连接地址。格式为jdbc:postgresql://<内网地址>:<端口>/<连接的数据库名称>,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres
      tablename AnalyticDB PostgreSQL版的表名。
      username AnalyticDB PostgreSQL版的数据库账号。
      password AnalyticDB PostgreSQL版的数据库账号密码。
      joinmaxrows 一对多JOIN时,左表一个记录JOIN右表后返回的最大记录数,默认值为1024。

      当一对多JOIN的记录数过多时,可能会影响流任务的性能,此时需要您增大缓存的内存。

      maxretrytimes SQL执行失败后重试次数,默认值为3。
      connectionmaxactive 维表内置连接池的最大连接数,默认值为5。
      retrywaittime 出现异常重试时间隔的时间。单位为毫秒(ms),默认值为100。
      targetschema AnalyticDB PostgreSQL版的Schema,默认为public。
      casesensitive 列名和表名是否区分大小写,取值说明:
      • 0(默认):不区分大小写。
      • 1:区分大小写。
      cache 缓存策略,取值说明:
      • none(默认):无缓存。
      • lru:缓存维表里的部分数据。源表来一条数据,系统会先查找缓存,如果没有找到该数据,则去物理维表中查询。

        LRU缓存策略需要配置cachesize和cachettlms参数。

      • all:缓存维表里的所有数据。在作业运行前,系统会将维表中所有数据加载到缓存中,之后所有的维表查询都会通过缓存进行查询。如果在缓存中无法找到数据,则表示KEY不存在,系统在缓存过期后重新加载一遍全量缓存。适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)较多的场景。

        ALL缓存策略需要配置cachettlms参数。

      cachesize LRU缓存的最大行数,默认值为10000行。
      cachettlms 缓存更新的时间间隔,单位为毫秒(ms)。系统会根据您设置的缓存更新时间间隔,重新加载一次维表中的最新数据,保证源表能JOIN到维表的最新数据。

      默认不设置该参数,表示不重新加载维表中的新数据。

      exceptionmode 数据读取过程中出现异常时的处理策略,取值说明:
      • ignore(默认值):忽略异常,仅返回异常发生前的数据。
      • strict:数据读取异常时,故障转移(Failover)并报错。
    4. 单击页面右上角的执行
  5. 读取AnalyticDB PostgreSQL版的数据,具体步骤如下:
    1. 将以下作业代码拷贝到作业文本编辑区。
      INSERT INTO print_table
      SELECT adbpg.id,adbpg.username,ds.f_random_str
      FROM datagen_source AS ds
       join
      dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS adbpg
      on ds.f_sequence = adbpg.id;
      说明 此处需要使用FOR SYSTEM_TIME AS OF PROCTIME()语法指定维表查询。具体介绍,请参见维表JOIN语句
    2. 单击页面右上角的执行
      如果您没有处于运行状态的Session集群,系统会提示必须配置 Preview Session 集群,配置步骤如下:
      1. 必须配置 Preview Session 集群对话框中单击确定
      2. 输入Session集群的名称,并打开设置为 SQL Previews 集群开关,其他配置保持默认即可。
      3. 单击创建 Session 集群
      4. 单击页面右上方的启动
      5. 待Session集群状态变为RUNNING后,返回作业开发页面重新执行作业。
    3. 调试数据配置对话框中,单击执行
    4. 当实时计算Flink版的结果框中返回查询结果时,表示已正常读取到AnalyticDB PostgreSQL版的数据,返回示例如下:
      返回结果

类型映射

以下内容为AnalyticDB PostgreSQL版和实时计算Flink的字段类型映射。

实时计算Flink字段类型 AnalyticDB PostgreSQL版字段类型
BOOLEAN BOOLEAN
TINYINT SAMLLINT
SAMLLINT SAMLLINT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE PRECISION
VARCHAR TEXT
DATE DATE
FLOAT DOUBLE PRECISION
DECIMAL DECIMAL
TIME TIME
TIMESTAMP TIMESTAMP