本文介绍如何通过阿里云实时计算Flink版(基于VVP计算引擎)写入数据到AnalyticDB PostgreSQL版

注意事项

该功能暂不支持AnalyticDB PostgreSQL版Serverless版本

前提条件

  • AnalyticDB PostgreSQL版实例和实时计算Flink集群需要位于同一VPC下。
  • 已创建实时计算Flink集群,创建操作,请参见开通流程
  • 已创建AnalyticDB PostgreSQL版实例,创建操作,请参见创建实例

配置实时计算Flink集群

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. Connectors页签,单击+图标上传AnalyticDB PostgreSQL版自定义Flink Connector的JAR包。
    上传Connector
    说明
    • 获取AnalyticDB PostgreSQL版自定义Flink Connector的JAR包,请参见AnalyticDB PostgreSQL Connector
    • JAR包的版本需要与实时计算平台的Flink引擎版本一致。
  5. 完成JAR包的上传后,单击继续
  6. 单击完成

配置AnalyticDB PostgreSQL版实例

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台
  2. 将Flink集群所属的网段加入AnalyticDB PostgreSQL版的白名单,如何添加白名单,请参见设置白名单
  3. 单击登录数据库,连接数据库的更多方式,请参见客户端连接
  4. AnalyticDB PostgreSQL版实例上创建一张表。
    建表SQL示例如下:
    CREATE TABLE test_adbpg_table(
    b1 bigint,
    b2 bigint,
    b3 text,
    b4 json,
    PRIMARY KEY(b1)
    );

写入数据到AnalyticDB PostgreSQL版

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 创建随机源表datagen_source和对应AnalyticDB PostgreSQL版的表test_adbpg_table,具体步骤如下:
    1. 作业开发页面,单击新建
    2. 新建文件对话框中,输入文件名称,其他配置保持默认即可,单击确认
      新建文件
    3. 将以下作业代码拷贝到作业文本编辑区。
      CREATE TABLE datagen_source (
       f_sequence INT,
       f_random INT,
       f_random_str STRING
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='5',
       'fields.f_sequence.kind'='sequence',
       'fields.f_sequence.start'='1',
       'fields.f_sequence.end'='1000',
       'fields.f_random.min'='1',
       'fields.f_random.max'='1000',
       'fields.f_random_str.length'='10'
      );
      
      CREATE TABLE test_adbpg_table (
          `B1` bigint   ,
          `B2` bigint  ,
          `B3` VARCHAR ,
          `B4` VARCHAR,
           PRIMARY KEY(B1) not ENFORCED
      ) with (
         'connector' = 'adbpg-nightly-1.13',
         'password' = 'xxx',
         'tablename' = 'test_adbpg_table',
         'username' = 'xxxx',
         'url' = 'jdbc:postgresql://url:5432/schema',
         'maxretrytimes' = '2',
         'batchsize' = '50000',
         'connectionmaxactive' = '5',
         'conflictmode' = 'ignore',
         'usecopy' = '0',
         'targetschema' = 'public',
         'exceptionmode' = 'ignore',
         'casesensitive' = '0',
         'writemode' = '1',
         'retrywaittime' = '200'
      );

      其中datagen_source表的参数无需修改,test_adbpg_table表的参数需要根据您实际情况进行修改,参数说明如下:

      参数 是否必填 说明
      connector connector名称,固定为adbpg-nightly-版本号,例如adbpg-nightly-1.13
      url AnalyticDB PostgreSQL版的JDBC连接地址。格式为jdbc:postgresql://<内网地址>:<端口>/<连接的数据库名称>,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres
      tablename AnalyticDB PostgreSQL版的表名。
      username AnalyticDB PostgreSQL版的数据库账号。
      password AnalyticDB PostgreSQL版的数据库账号密码。
      maxretrytimes SQL执行失败后重试次数,默认为3次。
      batchsize 一次批量写入的最大条数,默认为50000条。
      exceptionmode 数据写入过程中出现异常时的处理策略。支持以下两种处理策略:
      • ignore(默认值):忽略出现异常时写入的数据。
      • strict:数据写入异常时,故障转移(Failover)并报错。
      conflictmode 当出现主键冲突或者唯一索引冲突时的处理策略。支持以下四种处理策略:
      • ignore :忽略主键冲突,保留之前的数据。
      • strict:主键冲突时,故障转移(Failover)并报错。
      • update:主键冲突时,更新新到的数据。
      • upsert(默认值):主键冲突时,采用UPSERT方式写入数据。

        AnalyticDB PostgreSQL版通过INSERT ON CONFLICTCOPY ON CONFLICT实现UPSERT写入数据,如果目标表为分区表,则需要内核小版本为V6.3.6.1及以上,如何升级内核小版本,请参见版本升级

      targetschema AnalyticDB PostgreSQL版的Schema,默认为public。
      writemode 写入方式。取值说明:
      • 0 :采用BATCH INSERT方式写入数据。
      • 1(默认值):采用COPY API写入数据。
      • 2:采用BATCH UPSERT方式写入数据。
      verbose 是否输出connector运行日志。取值说明:
      • 0(默认):不输出运行日志。
      • 1:输出运行日志。
      retrywaittime 出现异常重试时间隔的时间。单位为ms,默认值为100。
      batchwritetimeoutms 攒批写入数据时最长攒批时间,超过此事件会触发这一批的写入。单位为毫秒(ms),默认值为50000。
      connectionmaxactive 连接池参数,单个Task manager中连接池同一时刻最大连接数。默认值为5。
      casesensitive 列名和表名是否区分大小写,取值说明:
      • 0(默认):不区分大小写。
      • 1:区分大小写。
    4. 单击页面右上角的执行
    5. 将以下作业代码拷贝到作业文本编辑区。
      INSERT INTO test_adbpg_table
      SELECT f_sequence,f_random,f_random_str,'{ "customer": "value", "items": {"product": "Beer","qty": 6}}'
      FROM datagen_source;
    6. 单击页面右上角的上线
  5. 创建数据写入的Flink SQL作业,具体步骤如下:
    1. 将以下作业代码拷贝到作业文本编辑区。
      INSERT INTO test_adbpg_table
      SELECT f_sequence,f_random,f_random_str,'{ "customer": "value", "items": {"product": "Beer","qty": 6}}'
      FROM datagen_source;
    2. 单击页面右上角的上线
  6. 启动作业运维,具体步骤如下:
    1. 单击左侧导航栏中作业运维
    2. 作业运维页面,单击目标作业操作列的启动
    3. 单击确认启动

结果验证

  1. 连接AnalyticDB PostgreSQL版数据库。具体操作,请参见客户端连接
  2. 执行以下语句查询test_adbpg_table表。
    SELECT * FROM test_adbpg_table;

    数据正常写入到AnalyticDB PostgreSQL版中,返回示例如下:

    Flink查询结果

类型映射

以下内容为AnalyticDB PostgreSQL版和实时计算Flink的字段类型映射。

实时计算Flink字段类型 AnalyticDB PostgreSQL版字段类型
BOOLEAN BOOLEAN
TINYINT SAMLLINT
SAMLLINT SAMLLINT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE PRECISION
VARCHAR TEXT
DATE DATE
FLOAT DOUBLE PRECISION
DECIMAL DECIMAL
TIME TIME
TIMESTAMP TIMESTAMP