通过Flink读写AnalyticDB PostgreSQL数据

更新时间:

本文介绍如何通过阿里云实时计算Flink版实时读写云原生数据仓库AnalyticDB PostgreSQL数据。

背景信息

云原生数据仓库AnalyticDB PostgreSQL是一种大规模并行处理(MPP)数据仓库服务,可提供海量数据在线分析服务。实时计算Flink是基于Apache Flink构建的⼀站式实时大数据分析平台,内置丰富上下游连接器,满足不同业务场景的需求,提供高效、灵活的实时计算服务。通过实时计算Flink版读取AnalyticDB PostgreSQL版数据,可以充分发挥云原生数据仓库的优势,提高数据分析的效率和精度。

使用限制

  • 该功能暂不支持AnalyticDB PostgreSQLServerless模式

  • Flink实时计算引擎VVR 6.0.0及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版连接器。

  • Flink实时计算引擎VVR 8.0.1及以上版本支持云原生数据仓库AnalyticDB PostgreSQL7.0版本。

    说明

    如果您使用了自定义连接器,具体操作请参见管理自定义连接器

前提条件

  • 已创建Flink全托管工作空间。具体操作,请参见开通Flink全托管

  • 已创建AnalyticDB PostgreSQL实例。具体操作,请参见创建实例

  • AnalyticDB PostgreSQL实例和Flink全托管工作空间需要位于同一VPC下。

步骤一:配置AnalyticDB PostgreSQL版实例

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台
  2. Flink工作空间所属的网段加入AnalyticDB PostgreSQL的白名单。如何添加白名单,请参见设置白名单

  3. 单击登录数据库,连接数据库的更多方式,请参见客户端连接

  4. AnalyticDB PostgreSQL实例上创建一张名为adbpg_dim_table的维表并插入50条测试数据。

    建表SQL和插入数据SQL的示例如下:

    --创建名称为adbpg_dim_table的表。
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    --向adbpg_dim_table的表中插入50行数据,其中id字段的值为从150的整数,而username字段的值为username字符串后面跟随当前行数的文本表示。
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  5. 创建一张名为adbpg_sink_table的结果表,用于Flink写入结果数据。

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

步骤二:创建Flink作业

  1. 登录实时计算控制台,在Flink全托管页签,单击目标工作空间操作列下的控制台

  2. 在左侧导航栏,单击SQL开发,单击新建,选择空白的流作业草稿,单击下一步

  3. 新建文件草稿对话框,填写作业配置信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    adbpg-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-6.0.7-flink-1.15

  4. 单击创建

步骤三:编写作业代码并部署

  1. 将以下作业代码拷贝到作业文本编辑区。

    ---创建一个datagen源表。
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --创建adbpg维表。
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flink****test',
     'password' = '*******',
     'maxJoinRows'='100',
     'maxRetryTimes'='1', 
     'cache'='lru',
     'cacheSize'='1000'
    );
    
    --创建adbpg结果表。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flink****test',
      'password' = '******',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    --维表和源表join后的结果插入adbpg结果表。
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
     join
    dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. 根据实际情况修改以下参数,参数说明如下。

    参数

    是否必填

    说明

    URL

    AnalyticDB PostgreSQLJDBC连接地址。格式为jdbc:postgresql://<内网地址>:<端口>/<连接的数据库名称>,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres

    tablename

    AnalyticDB PostgreSQL的表名。

    username

    AnalyticDB PostgreSQL的数据库账号。

    password

    AnalyticDB PostgreSQL的数据库账号密码。

    说明

    更多参数和类型映射,详情请参见连接器云原生数据仓库AnalyticDB PostgreSQL版(ADB PG)

  3. 在作业开发页面顶部,单击深度检查,进行语法检查。

  4. 单击部署

  5. 作业运维页面,单击启动

步骤四:查看Flink写入数据

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台
  2. 单击登录数据库,连接数据库的更多方式,请参见客户端连接

  3. 执行如下查询语句,查看Flink写入数据。

    SELECT * FROM adbpg_sink_table;
    image.png

相关文档