通过实时计算Flink版写入数据到云原生数据仓库AnalyticDB PostgreSQL版

更新时间:

本文介绍如何通过阿里云实时计算Flink版写入数据到AnalyticDB PostgreSQL

使用限制

  • 该功能暂不支持AnalyticDB PostgreSQLServerless模式

  • Flink实时计算引擎VVR 6.0.0及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版连接器。

  • Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL7.0版本。

    说明

    如果您使用了自定义连接器,具体操作请参见管理自定义连接器

前提条件

  • 已创建Flink全托管工作空间。具体操作,请参见开通Flink全托管

  • 已创建AnalyticDB PostgreSQL实例。具体操作,请参见创建实例

  • AnalyticDB PostgreSQL实例和Flink全托管工作空间需要位于同一VPC下。

配置AnalyticDB PostgreSQL版实例

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台
  2. Flink工作空间所属的网段加入AnalyticDB PostgreSQL的白名单。如何添加白名单,请参见设置白名单

  3. 单击登录数据库,连接数据库的更多方式,请参见客户端连接

  4. AnalyticDB PostgreSQL实例上创建一张表。

    建表SQL示例如下:

    CREATE TABLE test_adbpg_table(
    b1 int,
    b2 int,
    b3 text,
    PRIMARY KEY(b1)
    );

配置实时计算Flink

  1. 登录实时计算控制台

  2. Flink全托管页签,单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据连接

  4. 数据连接页面,单击创建自定义连接器

  5. 上传自定义连接器JAR文件。

    说明
    • 获取AnalyticDB PostgreSQL自定义Flink ConnectorJAR包,请参见AnalyticDB PostgreSQL Connector

    • JAR包的版本需要与实时计算平台的Flink引擎版本一致。

  6. 上传完成后,单击下一步

    系统会对您上传的自定义连接器内容进行解析。如果解析成功,您可以继续下一步。如果解析失败,请确认您上传的自定义连接器代码是否符合Flink社区标准。

  7. 单击完成

    创建完成的自定义连接器会出现在连接器列表中。

创建Flink作业

  1. 登录实时计算控制台,在Flink全托管页签,单击目标工作空间操作列下的控制台

  2. 在左侧导航栏,单击SQL开发,单击新建,选择空白的流作业草稿,单击下一步

  3. 新建文件草稿对话框,填写作业配置信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    adbpg-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-6.0.7-flink-1.15

  4. 单击创建

写入数据到AnalyticDB PostgreSQL

  1. 编写作业代码。

    创建随机源表datagen_source和对应AnalyticDB PostgreSQL的表test_adbpg_table,将以下作业代码拷贝到作业文本编辑区。

    CREATE TABLE datagen_source (
     f_sequence INT,
     f_random INT,
     f_random_str STRING
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_sequence.kind'='sequence',
     'fields.f_sequence.start'='1',
     'fields.f_sequence.end'='1000',
     'fields.f_random.min'='1',
     'fields.f_random.max'='1000',
     'fields.f_random_str.length'='10'
    );
    
    CREATE TABLE test_adbpg_table (
        `B1` bigint   ,
        `B2` bigint  ,
        `B3` VARCHAR ,
        `B4` VARCHAR,
         PRIMARY KEY(B1) not ENFORCED
    ) with (
       'connector' = 'adbpg-nightly-1.13',
       'password' = 'xxx',
       'tablename' = 'test_adbpg_table',
       'username' = 'xxxx',
       'url' = 'jdbc:postgresql://url:5432/schema',
       'maxretrytimes' = '2',
       'batchsize' = '50000',
       'connectionmaxactive' = '5',
       'conflictmode' = 'ignore',
       'usecopy' = '0',
       'targetschema' = 'public',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    其中datagen_source表的参数无需修改,test_adbpg_table表的参数需要根据您实际情况进行修改,参数说明如下:

    参数

    是否必填

    说明

    connector

    connector名称,固定为adbpg-nightly-版本号,例如adbpg-nightly-1.13

    url

    AnalyticDB PostgreSQLJDBC连接地址。格式为jdbc:postgresql://<内网地址>:<端口>/<连接的数据库名称>,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres

    tablename

    AnalyticDB PostgreSQL的表名。

    username

    AnalyticDB PostgreSQL的数据库账号。

    password

    AnalyticDB PostgreSQL的数据库账号密码。

    maxretrytimes

    SQL执行失败后重试次数,默认为3次。

    batchsize

    一次批量写入的最大条数,默认为50000条。

    exceptionmode

    数据写入过程中出现异常时的处理策略。支持以下两种处理策略:

    • ignore(默认值):忽略出现异常时写入的数据。

    • strict:数据写入异常时,故障转移(Failover)并报错。

    conflictmode

    当出现主键冲突或者唯一索引冲突时的处理策略。支持以下四种处理策略:

    • ignore :忽略主键冲突,保留之前的数据。

    • strict:主键冲突时,故障转移(Failover)并报错。

    • update:主键冲突时,更新新到的数据。

    • upsert(默认值):主键冲突时,采用UPSERT方式写入数据。

      AnalyticDB PostgreSQL通过INSERT ON CONFLICTCOPY ON CONFLICT实现UPSERT写入数据,如果目标表为分区表,则需要内核小版本为V6.3.6.1及以上,如何升级内核小版本,请参见版本升级

    targetschema

    AnalyticDB PostgreSQLSchema,默认为public。

    writemode

    写入方式。取值说明:

    • 0 :采用BATCH INSERT方式写入数据。

    • 1(默认值):采用COPY API写入数据。

    • 2:采用BATCH UPSERT方式写入数据。

    verbose

    是否输出connector运行日志。取值说明:

    • 0(默认):不输出运行日志。

    • 1:输出运行日志。

    retrywaittime

    出现异常重试时间隔的时间。单位为ms,默认值为100。

    batchwritetimeoutms

    攒批写入数据时最长攒批时间,超过此事件会触发这一批的写入。单位为毫秒(ms),默认值为50000。

    connectionmaxactive

    连接池参数,单个Task manager中连接池同一时刻最大连接数。默认值为5。

    casesensitive

    列名和表名是否区分大小写,取值说明:

    • 0(默认):不区分大小写。

    • 1:区分大小写。

    说明

    支持参数和类型映射,详情请参见连接器云原生数据仓库AnalyticDB PostgreSQL

  2. 启动作业。

    1. 在作业开发页面顶部,单击部署,在弹出的对话框中,单击确定

      说明

      Session集群适用于非生产环境的开发测试环境,您可以使用Session集群模式调试作业,提高作业JM(Job Manager)资源利用率和作业启动速度。但不推荐您将作业提交至Session集群中,因为会存在业务稳定性问题,详情请参见作业调试

    2. 作业运维页面,单击目标作业操作列的启动

    3. 单击启动

观察同步结果

  1. 连接AnalyticDB PostgreSQL数据库。具体操作,请参见客户端连接

  2. 执行以下语句查询test_adbpg_table表。

    SELECT * FROM test_adbpg_table;

    数据正常写入到AnalyticDB PostgreSQL中,返回示例如下。

    adbpg2.png

相关文档