Read from and write to AnalyticDB for PostgreSQL

更新时间:
复制 MD 格式

This topic describes how to use Realtime Compute for Apache Flink to read from and write to an AnalyticDB for PostgreSQL instance.

Background information

AnalyticDB for PostgreSQL is a massively parallel processing (MPP) data warehousing service that provides online analytical processing for large-scale datasets. Realtime Compute for Apache Flink is an all-in-one real-time big data analytics platform built on Apache Flink. It provides a rich set of connectors for various upstream and downstream services to support different business scenarios and provide efficient, flexible real-time computing. Using Realtime Compute for Apache Flink to read data from AnalyticDB for PostgreSQL lets you leverage the advantages of the cloud-native data warehouse to improve the efficiency and accuracy of your data analysis.

Limits

  • This feature is not supported for AnalyticDB for PostgreSQL instances that run in serverless mode.

  • The AnalyticDB for PostgreSQL connector requires Realtime Compute for Apache Flink engine version VVR 6.0.0 or later.

  • AnalyticDB for PostgreSQL V7.0 requires Realtime Compute for Apache Flink engine version VVR 8.0.1 or later.

    Note

    If you use a custom connector, see Manage custom connectors.

Prerequisites

Before you begin, ensure that you have:

  • A fully managed Flink workspace. See Activate fully managed Flink.

  • An AnalyticDB for PostgreSQL instance. See Create an instance.

  • The AnalyticDB for PostgreSQL instance and the fully managed Flink workspace in the same virtual private cloud (VPC).

Step 1: Configure the AnalyticDB for PostgreSQL instance

  1. Log on to the AnalyticDB for PostgreSQL console.
  2. Add the CIDR block of your Flink workspace to the ip address whitelist of the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.

  3. Click Log On to Database. For more information about how to connect to the database, see Client connection.

  4. In the AnalyticDB for PostgreSQL instance, create a dimension table named adbpg_dim_table and insert 50 rows of test data.

    The following code creates the table and inserts data:

    -- Create a table named adbpg_dim_table.
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    -- Insert 50 rows of data into the adbpg_dim_table table. The value of the id field is an integer from 1 to 50, and the value of the username field is the 'username' string followed by the current row number.
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  5. Create a result table named adbpg_sink_table to store the result data from Flink.

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

Step 2: Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console. On the Actions tab, find the desired workspace and click Console in the Actions column.

  2. In the left-side navigation pane, click SQL Editor. Click New, select Blank Stream Draft, and then click Next.

  3. In the New Draft dialog box, set the job parameters.

    Parameter

    Description

    Example

    Name

    The name of the job.

    Note

    The name must be unique within the current project.

    adbpg-test

    Location

    The folder where the job's code file is stored.

    You can also click the 新建文件夹 icon next to an existing folder to create a subfolder.

    Drafts

    Engine Version

    The Flink engine version for the job. For more information about engine versions, see Engine versions.

    vvr-6.0.7-flink-1.15

  4. Click Create.

Step 3: Write and deploy the job code

  1. Copy the following code into the editor.

    -- Create a Datagen source table. 
    -- This table generates 50 rows of data with sequential IDs from 1 to 50 and random scores from 70 to 100.
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    -- Create a dimension table to read data from the adbpg_dim_table table by using a temporal join.
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://:/',
     'tablename' = 'adbpg_dim_table', 
     'username' = '',
     'password' = '',
     'maxJoinRows'='100',
     'maxRetryTimes'='1', 
     'cache'='lru',
     'cacheSize'='1000'
    );
    
    -- Create a result table to write the joined data to the adbpg_sink_table table.
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://:/',
      'tablename' = 'adbpg_sink_table',  
      'username' = '',
      'password' = '',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- Join the source table with the dimension table and insert the results into the result table.
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
     join
    dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. Replace the placeholder values in the code with your connection details. The following table describes the required parameters.

    Parameter

    Required

    Description

    url

    Yes

    The JDBC connection URL for AnalyticDB for PostgreSQL. The format is jdbc:postgresql://<Internal endpoint>:<Port>/<Database name>. For example: jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres.

    tablename

    Yes

    The name of the table in AnalyticDB for PostgreSQL.

    username

    Yes

    The database account for AnalyticDB for PostgreSQL.

    password

    Yes

    The password for the AnalyticDB for PostgreSQL database account.

    Note

    For a full list of parameters and data type mappings, see the AnalyticDB for PostgreSQL connector documentation.

  3. In the upper-right corner of the job development page, click Validate.

  4. Click Deploy.

  5. On the Deployments page, click Resume.

Step 4: Verify the results

  1. Log on to the AnalyticDB for PostgreSQL console.
  2. Click Log On to Database. For more information, see Client connection.

  3. Run the following query to view the data written by Flink:

    SELECT * FROM adbpg_sink_table;

    image.png

Related documentation